You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:53:03 UTC

[19/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples package

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
deleted file mode 100644
index 9a52330..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItemCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Singleton keyed work item coder.
- */
-public class SingletonKeyedWorkItemCoder<K, ElemT>
-    extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> {
-  /**
-   * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
-   * coder.
-   */
-  public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of(
-      Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
-    return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
-  }
-
-  @JsonCreator
-  public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
-    checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size());
-    @SuppressWarnings("unchecked")
-    Coder<K> keyCoder = (Coder<K>) components.get(0);
-    @SuppressWarnings("unchecked")
-    Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
-    @SuppressWarnings("unchecked")
-    Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2);
-    return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
-  }
-
-  private final Coder<K> keyCoder;
-  private final Coder<ElemT> elemCoder;
-  private final Coder<? extends BoundedWindow> windowCoder;
-  private final WindowedValue.FullWindowedValueCoder<ElemT> valueCoder;
-
-  private SingletonKeyedWorkItemCoder(
-      Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
-    this.keyCoder = keyCoder;
-    this.elemCoder = elemCoder;
-    this.windowCoder = windowCoder;
-    valueCoder = WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder);
-  }
-
-  public Coder<K> getKeyCoder() {
-    return keyCoder;
-  }
-
-  public Coder<ElemT> getElementCoder() {
-    return elemCoder;
-  }
-
-  @Override
-  public void encode(SingletonKeyedWorkItem<K, ElemT> value,
-                     OutputStream outStream,
-                     Context context)
-      throws CoderException, IOException {
-    keyCoder.encode(value.key(), outStream, context.nested());
-    valueCoder.encode(value.value, outStream, context);
-  }
-
-  @Override
-  public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context)
-      throws CoderException, IOException {
-    K key = keyCoder.decode(inStream, context.nested());
-    WindowedValue<ElemT> value = valueCoder.decode(inStream, context);
-    return new SingletonKeyedWorkItem<>(key, value);
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return ImmutableList.of(keyCoder, elemCoder, windowCoder);
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    keyCoder.verifyDeterministic();
-    elemCoder.verifyDeterministic();
-    windowCoder.verifyDeterministic();
-  }
-
-  /**
-   * {@inheritDoc}.
-   *
-   * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a
-   * {@link KeyedWorkItem} of a type different from the originally encoded type.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
deleted file mode 100644
index 40f70e4..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import org.apache.beam.runners.core.ElementAndRestriction;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
-import org.apache.beam.runners.core.OutputWindowedValue;
-import org.apache.beam.runners.core.SplittableParDo;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing
- * the {@code @ProcessElement} method of a splittable {@link DoFn}.
- */
-public class SplittableDoFnOperator<
-    InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
-    extends DoFnOperator<
-    KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> {
-
-  public SplittableDoFnOperator(
-      DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn,
-      Coder<
-          WindowedValue<
-              KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
-      TupleTag<FnOutputT> mainOutputTag,
-      List<TupleTag<?>> additionalOutputTags,
-      OutputManagerFactory<OutputT> outputManagerFactory,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Map<Integer, PCollectionView<?>> sideInputTagMapping,
-      Collection<PCollectionView<?>> sideInputs,
-      PipelineOptions options,
-      Coder<?> keyCoder) {
-    super(
-        doFn,
-        inputCoder,
-        mainOutputTag,
-        additionalOutputTags,
-        outputManagerFactory,
-        windowingStrategy,
-        sideInputTagMapping,
-        sideInputs,
-        options,
-        keyCoder);
-
-  }
-
-  @Override
-  public void open() throws Exception {
-    super.open();
-
-    checkState(doFn instanceof SplittableParDo.ProcessFn);
-
-    StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() {
-      @Override
-      public StateInternals<String> stateInternalsForKey(String key) {
-        //this will implicitly be keyed by the key of the incoming
-        // element or by the key of a firing timer
-        return (StateInternals<String>) stateInternals;
-      }
-    };
-    TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
-      @Override
-      public TimerInternals timerInternalsForKey(String key) {
-        //this will implicitly be keyed like the StateInternalsFactory
-        return timerInternals;
-      }
-    };
-
-    ((SplittableParDo.ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory);
-    ((SplittableParDo.ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory);
-    ((SplittableParDo.ProcessFn) doFn).setProcessElementInvoker(
-        new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
-            doFn,
-            serializedOptions.getPipelineOptions(),
-            new OutputWindowedValue<FnOutputT>() {
-              @Override
-              public void outputWindowedValue(
-                  FnOutputT output,
-                  Instant timestamp,
-                  Collection<? extends BoundedWindow> windows,
-                  PaneInfo pane) {
-                outputManager.output(
-                    mainOutputTag,
-                    WindowedValue.of(output, timestamp, windows, pane));
-              }
-
-              @Override
-              public <AdditionalOutputT> void outputWindowedValue(
-                  TupleTag<AdditionalOutputT> tag,
-                  AdditionalOutputT output,
-                  Instant timestamp,
-                  Collection<? extends BoundedWindow> windows,
-                  PaneInfo pane) {
-                outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
-              }
-            },
-            sideInputReader,
-            Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
-            10000,
-            Duration.standardSeconds(10)));
-  }
-
-  @Override
-  public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
-    doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
-        KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
-            (String) stateInternals.getKey(),
-            Collections.singletonList(timer.getNamespace()))));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
deleted file mode 100644
index 9b2136c..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static org.apache.beam.runners.core.TimerInternals.TimerData;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-
-/**
- * Flink operator for executing window {@link DoFn DoFns}.
- */
-public class WindowDoFnOperator<K, InputT, OutputT>
-    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> {
-
-  private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
-
-  public WindowDoFnOperator(
-      SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn,
-      Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder,
-      TupleTag<KV<K, OutputT>> mainOutputTag,
-      List<TupleTag<?>> additionalOutputTags,
-      OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Map<Integer, PCollectionView<?>> sideInputTagMapping,
-      Collection<PCollectionView<?>> sideInputs,
-      PipelineOptions options,
-      Coder<K> keyCoder) {
-    super(
-        null,
-        inputCoder,
-        mainOutputTag,
-        additionalOutputTags,
-        outputManagerFactory,
-        windowingStrategy,
-        sideInputTagMapping,
-        sideInputs,
-        options,
-        keyCoder);
-
-    this.systemReduceFn = systemReduceFn;
-
-  }
-
-  @Override
-  protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
-    StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
-      @Override
-      public StateInternals<K> stateInternalsForKey(K key) {
-        //this will implicitly be keyed by the key of the incoming
-        // element or by the key of a firing timer
-        return (StateInternals<K>) stateInternals;
-      }
-    };
-    TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {
-      @Override
-      public TimerInternals timerInternalsForKey(K key) {
-        //this will implicitly be keyed like the StateInternalsFactory
-        return timerInternals;
-      }
-    };
-
-    // we have to do the unchecked cast because GroupAlsoByWindowViaWindowSetDoFn.create
-    // has the window type as generic parameter while WindowingStrategy is almost always
-    // untyped.
-    @SuppressWarnings("unchecked")
-    DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
-        GroupAlsoByWindowViaWindowSetNewDoFn.create(
-            windowingStrategy, stateInternalsFactory, timerInternalsFactory, sideInputReader,
-                (SystemReduceFn) systemReduceFn, outputManager, mainOutputTag);
-    return doFn;
-  }
-
-  @Override
-  public void fireTimer(InternalTimer<?, TimerData> timer) {
-    doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
-        KeyedWorkItems.<K, InputT>timersWorkItem(
-            (K) stateInternals.getKey(),
-            Collections.singletonList(timer.getNamespace()))));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
deleted file mode 100644
index 1dff367..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import java.nio.ByteBuffer;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-/**
- * {@link KeySelector} that retrieves a key from a {@link KeyedWorkItem}. This will return
- * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures
- * that all key comparisons/hashing happen on the encoded form.
- */
-public class WorkItemKeySelector<K, V>
-    implements KeySelector<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>,
-    ResultTypeQueryable<ByteBuffer> {
-
-  private final Coder<K> keyCoder;
-
-  public WorkItemKeySelector(Coder<K> keyCoder) {
-    this.keyCoder = keyCoder;
-  }
-
-  @Override
-  public ByteBuffer getKey(WindowedValue<SingletonKeyedWorkItem<K, V>> value) throws Exception {
-    K key = value.getValue().key();
-    byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
-    return ByteBuffer.wrap(keyBytes);
-  }
-
-  @Override
-  public TypeInformation<ByteBuffer> getProducedType() {
-    return new GenericTypeInfo<>(ByteBuffer.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
deleted file mode 100644
index 2ed5024..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.functions.StoppableFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink Source.
- */
-public class BoundedSourceWrapper<OutputT>
-    extends RichParallelSourceFunction<WindowedValue<OutputT>>
-    implements StoppableFunction {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
-
-  /**
-   * Keep the options so that we can initialize the readers.
-   */
-  private final SerializedPipelineOptions serializedOptions;
-
-  /**
-   * The split sources. We split them in the constructor to ensure that all parallel
-   * sources are consistent about the split sources.
-   */
-  private List<? extends BoundedSource<OutputT>> splitSources;
-
-  /**
-   * Make it a field so that we can access it in {@link #close()}.
-   */
-  private transient List<BoundedSource.BoundedReader<OutputT>> readers;
-
-  /**
-   * Initialize here and not in run() to prevent races where we cancel a job before run() is
-   * ever called or run() is called after cancel().
-   */
-  private volatile boolean isRunning = true;
-
-  @SuppressWarnings("unchecked")
-  public BoundedSourceWrapper(
-      PipelineOptions pipelineOptions,
-      BoundedSource<OutputT> source,
-      int parallelism) throws Exception {
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-
-    long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
-
-    // get the splits early. we assume that the generated splits are stable,
-    // this is necessary so that the mapping of state to source is correct
-    // when restoring
-    splitSources = source.split(desiredBundleSize, pipelineOptions);
-  }
-
-  @Override
-  public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
-
-    // figure out which split sources we're responsible for
-    int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-    int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
-
-    List<BoundedSource<OutputT>> localSources = new ArrayList<>();
-
-    for (int i = 0; i < splitSources.size(); i++) {
-      if (i % numSubtasks == subtaskIndex) {
-        localSources.add(splitSources.get(i));
-      }
-    }
-
-    LOG.info("Bounded Flink Source {}/{} is reading from sources: {}",
-        subtaskIndex,
-        numSubtasks,
-        localSources);
-
-    readers = new ArrayList<>();
-    // initialize readers from scratch
-    for (BoundedSource<OutputT> source : localSources) {
-      readers.add(source.createReader(serializedOptions.getPipelineOptions()));
-    }
-
-   if (readers.size() == 1) {
-      // the easy case, we just read from one reader
-      BoundedSource.BoundedReader<OutputT> reader = readers.get(0);
-
-      boolean dataAvailable = reader.start();
-      if (dataAvailable) {
-        emitElement(ctx, reader);
-      }
-
-      while (isRunning) {
-        dataAvailable = reader.advance();
-
-        if (dataAvailable)  {
-          emitElement(ctx, reader);
-        } else {
-          break;
-        }
-      }
-    } else {
-      // a bit more complicated, we are responsible for several readers
-      // loop through them and sleep if none of them had any data
-
-      int currentReader = 0;
-
-      // start each reader and emit data if immediately available
-      for (BoundedSource.BoundedReader<OutputT> reader : readers) {
-        boolean dataAvailable = reader.start();
-        if (dataAvailable) {
-          emitElement(ctx, reader);
-        }
-      }
-
-      // a flag telling us whether any of the readers had data
-      // if no reader had data, sleep for bit
-      boolean hadData = false;
-      while (isRunning && !readers.isEmpty()) {
-        BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader);
-        boolean dataAvailable = reader.advance();
-
-        if (dataAvailable) {
-          emitElement(ctx, reader);
-          hadData = true;
-        } else {
-          readers.remove(currentReader);
-          currentReader--;
-          if (readers.isEmpty()) {
-            break;
-          }
-        }
-
-        currentReader = (currentReader + 1) % readers.size();
-        if (currentReader == 0 && !hadData) {
-          Thread.sleep(50);
-        } else if (currentReader == 0) {
-          hadData = false;
-        }
-      }
-
-    }
-
-    // emit final Long.MAX_VALUE watermark, just to be sure
-    ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-  }
-
-  /**
-   * Emit the current element from the given Reader. The reader is guaranteed to have data.
-   */
-  private void emitElement(
-      SourceContext<WindowedValue<OutputT>> ctx,
-      BoundedSource.BoundedReader<OutputT> reader) {
-    // make sure that reader state update and element emission are atomic
-    // with respect to snapshots
-    synchronized (ctx.getCheckpointLock()) {
-
-      OutputT item = reader.getCurrent();
-      Instant timestamp = reader.getCurrentTimestamp();
-
-      WindowedValue<OutputT> windowedValue =
-          WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-      ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    super.close();
-    if (readers != null) {
-      for (BoundedSource.BoundedReader<OutputT> reader: readers) {
-        reader.close();
-      }
-    }
-  }
-
-  @Override
-  public void cancel() {
-    isRunning = false;
-  }
-
-  @Override
-  public void stop() {
-    this.isRunning = false;
-  }
-
-  /**
-   * Visible so that we can check this in tests. Must not be used for anything else.
-   */
-  @VisibleForTesting
-  public List<? extends BoundedSource<OutputT>> getSplitSources() {
-    return splitSources;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
deleted file mode 100644
index 910a33f..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Collections;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An example unbounded Beam source that reads input from a socket.
- * This is used mainly for testing and debugging.
- * */
-public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.CheckpointMark>
-    extends UnboundedSource<String, CheckpointMarkT> {
-
-  private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
-
-  private static final long serialVersionUID = 1L;
-
-  private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
-
-  private static final int CONNECTION_TIMEOUT_TIME = 0;
-
-  private final String hostname;
-  private final int port;
-  private final char delimiter;
-  private final long maxNumRetries;
-  private final long delayBetweenRetries;
-
-  public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
-    this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
-  }
-
-  public UnboundedSocketSource(String hostname,
-                               int port,
-                               char delimiter,
-                               long maxNumRetries,
-                               long delayBetweenRetries) {
-    this.hostname = hostname;
-    this.port = port;
-    this.delimiter = delimiter;
-    this.maxNumRetries = maxNumRetries;
-    this.delayBetweenRetries = delayBetweenRetries;
-  }
-
-  public String getHostname() {
-    return this.hostname;
-  }
-
-  public int getPort() {
-    return this.port;
-  }
-
-  public char getDelimiter() {
-    return this.delimiter;
-  }
-
-  public long getMaxNumRetries() {
-    return this.maxNumRetries;
-  }
-
-  public long getDelayBetweenRetries() {
-    return this.delayBetweenRetries;
-  }
-
-  @Override
-  public List<? extends UnboundedSource<String, CheckpointMarkT>> split(
-      int desiredNumSplits,
-      PipelineOptions options) throws Exception {
-    return Collections.<UnboundedSource<String, CheckpointMarkT>>singletonList(this);
-  }
-
-  @Override
-  public UnboundedReader<String> createReader(PipelineOptions options,
-                                              @Nullable CheckpointMarkT checkpointMark) {
-    return new UnboundedSocketReader(this);
-  }
-
-  @Nullable
-  @Override
-  public Coder getCheckpointMarkCoder() {
-    // Flink and Dataflow have different checkpointing mechanisms.
-    // In our case we do not need a coder.
-    return null;
-  }
-
-  @Override
-  public void validate() {
-    checkArgument(port > 0 && port < 65536, "port is out of range");
-    checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), "
-        + "or -1 (infinite retries)");
-    checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
-  }
-
-  @Override
-  public Coder getDefaultOutputCoder() {
-    return DEFAULT_SOCKET_CODER;
-  }
-
-  /**
-   * Unbounded socket reader.
-   */
-  public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
-
-    private final UnboundedSocketSource source;
-
-    private Socket socket;
-    private BufferedReader reader;
-
-    private boolean isRunning;
-
-    private String currentRecord;
-
-    public UnboundedSocketReader(UnboundedSocketSource source) {
-      this.source = source;
-    }
-
-    private void openConnection() throws IOException {
-      this.socket = new Socket();
-      this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()),
-          CONNECTION_TIMEOUT_TIME);
-      this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
-      this.isRunning = true;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      int attempt = 0;
-      while (!isRunning) {
-        try {
-          openConnection();
-          LOG.info("Connected to server socket " + this.source.getHostname() + ':'
-              + this.source.getPort());
-
-          return advance();
-        } catch (IOException e) {
-          LOG.info("Lost connection to server socket " + this.source.getHostname() + ':'
-              + this.source.getPort() + ". Retrying in "
-              + this.source.getDelayBetweenRetries() + " msecs...");
-
-          if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
-            try {
-              Thread.sleep(this.source.getDelayBetweenRetries());
-            } catch (InterruptedException e1) {
-              e1.printStackTrace();
-            }
-          } else {
-            this.isRunning = false;
-            break;
-          }
-        }
-      }
-      LOG.error("Unable to connect to host " + this.source.getHostname()
-          + " : " + this.source.getPort());
-      return false;
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      final StringBuilder buffer = new StringBuilder();
-      int data;
-      while (isRunning && (data = reader.read()) != -1) {
-        // check if the string is complete
-        if (data != this.source.getDelimiter()) {
-          buffer.append((char) data);
-        } else {
-          if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
-            buffer.setLength(buffer.length() - 1);
-          }
-          this.currentRecord = buffer.toString();
-          buffer.setLength(0);
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public byte[] getCurrentRecordId() throws NoSuchElementException {
-      return new byte[0];
-    }
-
-    @Override
-    public String getCurrent() throws NoSuchElementException {
-      return this.currentRecord;
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return Instant.now();
-    }
-
-    @Override
-    public void close() throws IOException {
-      this.reader.close();
-      this.socket.close();
-      this.isRunning = false;
-      LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":"
-          + this.source.getPort() + ".");
-    }
-
-    @Override
-    public Instant getWatermark() {
-      return Instant.now();
-    }
-
-    @Override
-    public CheckpointMark getCheckpointMark() {
-      return null;
-    }
-
-    @Override
-    public UnboundedSource<String, ?> getCurrentSource() {
-      return this.source;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
deleted file mode 100644
index bb9b58a..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ /dev/null
@@ -1,476 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.StoppableFunction;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wrapper for executing {@link UnboundedSource UnboundedSources} as a Flink Source.
- */
-public class UnboundedSourceWrapper<
-    OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
-    extends RichParallelSourceFunction<WindowedValue<OutputT>>
-    implements ProcessingTimeCallback, StoppableFunction,
-    CheckpointListener, CheckpointedFunction {
-
-  private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
-
-  /**
-   * Keep the options so that we can initialize the localReaders.
-   */
-  private final SerializedPipelineOptions serializedOptions;
-
-  /**
-   * For snapshot and restore.
-   */
-  private final KvCoder<
-      ? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> checkpointCoder;
-
-  /**
-   * The split sources. We split them in the constructor to ensure that all parallel
-   * sources are consistent about the split sources.
-   */
-  private final List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
-
-  /**
-   * The local split sources. Assigned at runtime when the wrapper is executed in parallel.
-   */
-  private transient List<UnboundedSource<OutputT, CheckpointMarkT>> localSplitSources;
-
-  /**
-   * The local split readers. Assigned at runtime when the wrapper is executed in parallel.
-   * Make it a field so that we can access it in {@link #onProcessingTime(long)} for
-   * emitting watermarks.
-   */
-  private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders;
-
-  /**
-   * Flag to indicate whether the source is running.
-   * Initialize here and not in run() to prevent races where we cancel a job before run() is
-   * ever called or run() is called after cancel().
-   */
-  private volatile boolean isRunning = true;
-
-  /**
-   * Make it a field so that we can access it in {@link #onProcessingTime(long)} for registering new
-   * triggers.
-   */
-  private transient StreamingRuntimeContext runtimeContext;
-
-  /**
-   * Make it a field so that we can access it in {@link #onProcessingTime(long)} for emitting
-   * watermarks.
-   */
-  private transient SourceContext<WindowedValue<OutputT>> context;
-
-  /**
-   * Pending checkpoints which have not been acknowledged yet.
-   */
-  private transient LinkedHashMap<Long, List<CheckpointMarkT>> pendingCheckpoints;
-  /**
-   * Keep a maximum of 32 checkpoints for {@code CheckpointMark.finalizeCheckpoint()}.
-   */
-  private static final int MAX_NUMBER_PENDING_CHECKPOINTS = 32;
-
-  private transient ListState<KV<? extends
-      UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> stateForCheckpoint;
-
-  /**
-   * false if checkpointCoder is null or no restore state by starting first.
-   */
-  private transient boolean isRestored = false;
-
-  @SuppressWarnings("unchecked")
-  public UnboundedSourceWrapper(
-      PipelineOptions pipelineOptions,
-      UnboundedSource<OutputT, CheckpointMarkT> source,
-      int parallelism) throws Exception {
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-
-    if (source.requiresDeduping()) {
-      LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source);
-    }
-
-    Coder<CheckpointMarkT> checkpointMarkCoder = source.getCheckpointMarkCoder();
-    if (checkpointMarkCoder == null) {
-      LOG.info("No CheckpointMarkCoder specified for this source. Won't create snapshots.");
-      checkpointCoder = null;
-    } else {
-
-      Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder =
-          (Coder) SerializableCoder.of(new TypeDescriptor<UnboundedSource>() {
-          });
-
-      checkpointCoder = KvCoder.of(sourceCoder, checkpointMarkCoder);
-    }
-
-    // get the splits early. we assume that the generated splits are stable,
-    // this is necessary so that the mapping of state to source is correct
-    // when restoring
-    splitSources = source.split(parallelism, pipelineOptions);
-  }
-
-
-  /**
-   * Initialize and restore state before starting execution of the source.
-   */
-  @Override
-  public void open(Configuration parameters) throws Exception {
-    runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
-
-    // figure out which split sources we're responsible for
-    int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
-    int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
-
-    localSplitSources = new ArrayList<>();
-    localReaders = new ArrayList<>();
-
-    pendingCheckpoints = new LinkedHashMap<>();
-
-    if (isRestored) {
-      // restore the splitSources from the checkpoint to ensure consistent ordering
-      for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored:
-          stateForCheckpoint.get()) {
-        localSplitSources.add(restored.getKey());
-        localReaders.add(restored.getKey().createReader(
-            serializedOptions.getPipelineOptions(), restored.getValue()));
-      }
-    } else {
-      // initialize localReaders and localSources from scratch
-      for (int i = 0; i < splitSources.size(); i++) {
-        if (i % numSubtasks == subtaskIndex) {
-          UnboundedSource<OutputT, CheckpointMarkT> source =
-              splitSources.get(i);
-          UnboundedSource.UnboundedReader<OutputT> reader =
-              source.createReader(serializedOptions.getPipelineOptions(), null);
-          localSplitSources.add(source);
-          localReaders.add(reader);
-        }
-      }
-    }
-
-    LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
-        subtaskIndex,
-        numSubtasks,
-        localSplitSources);
-  }
-
-  @Override
-  public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
-
-    context = ctx;
-
-    if (localReaders.size() == 0) {
-      // do nothing, but still look busy ...
-      // also, output a Long.MAX_VALUE watermark since we know that we're not
-      // going to emit anything
-      // we can't return here since Flink requires that all operators stay up,
-      // otherwise checkpointing would not work correctly anymore
-      ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-
-      // wait until this is canceled
-      final Object waitLock = new Object();
-      while (isRunning) {
-        try {
-          // Flink will interrupt us at some point
-          //noinspection SynchronizationOnLocalVariableOrMethodParameter
-          synchronized (waitLock) {
-            // don't wait indefinitely, in case something goes horribly wrong
-            waitLock.wait(1000);
-          }
-        } catch (InterruptedException e) {
-          if (!isRunning) {
-            // restore the interrupted state, and fall through the loop
-            Thread.currentThread().interrupt();
-          }
-        }
-      }
-    } else if (localReaders.size() == 1) {
-      // the easy case, we just read from one reader
-      UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
-
-      boolean dataAvailable = reader.start();
-      if (dataAvailable) {
-        emitElement(ctx, reader);
-      }
-
-      setNextWatermarkTimer(this.runtimeContext);
-
-      while (isRunning) {
-        dataAvailable = reader.advance();
-
-        if (dataAvailable)  {
-          emitElement(ctx, reader);
-        } else {
-          Thread.sleep(50);
-        }
-      }
-    } else {
-      // a bit more complicated, we are responsible for several localReaders
-      // loop through them and sleep if none of them had any data
-
-      int numReaders = localReaders.size();
-      int currentReader = 0;
-
-      // start each reader and emit data if immediately available
-      for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
-        boolean dataAvailable = reader.start();
-        if (dataAvailable) {
-          emitElement(ctx, reader);
-        }
-      }
-
-      // a flag telling us whether any of the localReaders had data
-      // if no reader had data, sleep for bit
-      boolean hadData = false;
-      while (isRunning) {
-        UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader);
-        boolean dataAvailable = reader.advance();
-
-        if (dataAvailable) {
-          emitElement(ctx, reader);
-          hadData = true;
-        }
-
-        currentReader = (currentReader + 1) % numReaders;
-        if (currentReader == 0 && !hadData) {
-          Thread.sleep(50);
-        } else if (currentReader == 0) {
-          hadData = false;
-        }
-      }
-
-    }
-  }
-
-  /**
-   * Emit the current element from the given Reader. The reader is guaranteed to have data.
-   */
-  private void emitElement(
-      SourceContext<WindowedValue<OutputT>> ctx,
-      UnboundedSource.UnboundedReader<OutputT> reader) {
-    // make sure that reader state update and element emission are atomic
-    // with respect to snapshots
-    synchronized (ctx.getCheckpointLock()) {
-
-      OutputT item = reader.getCurrent();
-      Instant timestamp = reader.getCurrentTimestamp();
-
-      WindowedValue<OutputT> windowedValue =
-          WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-      ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    super.close();
-    if (localReaders != null) {
-      for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
-        reader.close();
-      }
-    }
-  }
-
-  @Override
-  public void cancel() {
-    isRunning = false;
-  }
-
-  @Override
-  public void stop() {
-    isRunning = false;
-  }
-
-  // ------------------------------------------------------------------------
-  //  Checkpoint and restore
-  // ------------------------------------------------------------------------
-
-  @Override
-  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
-    if (!isRunning) {
-      LOG.debug("snapshotState() called on closed source");
-    } else {
-
-      if (checkpointCoder == null) {
-        // no checkpoint coder available in this source
-        return;
-      }
-
-      stateForCheckpoint.clear();
-
-      long checkpointId = functionSnapshotContext.getCheckpointId();
-
-      // we checkpoint the sources along with the CheckpointMarkT to ensure
-      // than we have a correct mapping of checkpoints to sources when
-      // restoring
-      List<CheckpointMarkT> checkpointMarks = new ArrayList<>(localSplitSources.size());
-
-      for (int i = 0; i < localSplitSources.size(); i++) {
-        UnboundedSource<OutputT, CheckpointMarkT> source = localSplitSources.get(i);
-        UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(i);
-
-        @SuppressWarnings("unchecked")
-        CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
-        checkpointMarks.add(mark);
-        KV<UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> kv =
-            KV.of(source, mark);
-        stateForCheckpoint.add(kv);
-      }
-
-      // cleanup old pending checkpoints and add new checkpoint
-      int diff = pendingCheckpoints.size() - MAX_NUMBER_PENDING_CHECKPOINTS;
-      if (diff >= 0) {
-        for (Iterator<Long> iterator = pendingCheckpoints.keySet().iterator();
-             diff >= 0;
-             diff--) {
-          iterator.next();
-          iterator.remove();
-        }
-      }
-      pendingCheckpoints.put(checkpointId, checkpointMarks);
-
-    }
-  }
-
-  @Override
-  public void initializeState(FunctionInitializationContext context) throws Exception {
-    if (checkpointCoder == null) {
-      // no checkpoint coder available in this source
-      return;
-    }
-
-    OperatorStateStore stateStore = context.getOperatorStateStore();
-    CoderTypeInformation<
-        KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>>
-        typeInformation = (CoderTypeInformation) new CoderTypeInformation<>(checkpointCoder);
-    stateForCheckpoint = stateStore.getOperatorState(
-        new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
-            typeInformation.createSerializer(new ExecutionConfig())));
-
-    if (context.isRestored()) {
-      isRestored = true;
-      LOG.info("Having restore state in the UnbounedSourceWrapper.");
-    } else {
-      LOG.info("No restore state for UnbounedSourceWrapper.");
-    }
-  }
-
-  @Override
-  public void onProcessingTime(long timestamp) throws Exception {
-    if (this.isRunning) {
-      synchronized (context.getCheckpointLock()) {
-        // find minimum watermark over all localReaders
-        long watermarkMillis = Long.MAX_VALUE;
-        for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
-          Instant watermark = reader.getWatermark();
-          if (watermark != null) {
-            watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);
-          }
-        }
-        context.emitWatermark(new Watermark(watermarkMillis));
-      }
-      setNextWatermarkTimer(this.runtimeContext);
-    }
-  }
-
-  private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
-    if (this.isRunning) {
-      long watermarkInterval =  runtime.getExecutionConfig().getAutoWatermarkInterval();
-      long timeToNextWatermark = getTimeToNextWatermark(watermarkInterval);
-      runtime.getProcessingTimeService().registerTimer(timeToNextWatermark, this);
-    }
-  }
-
-  private long getTimeToNextWatermark(long watermarkInterval) {
-    return System.currentTimeMillis() + watermarkInterval;
-  }
-
-  /**
-   * Visible so that we can check this in tests. Must not be used for anything else.
-   */
-  @VisibleForTesting
-  public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() {
-    return splitSources;
-  }
-
-  /**
-   * Visible so that we can check this in tests. Must not be used for anything else.
-   */
-  @VisibleForTesting
-  public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources() {
-    return localSplitSources;
-  }
-
-  @Override
-  public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
-    List<CheckpointMarkT> checkpointMarks = pendingCheckpoints.get(checkpointId);
-
-    if (checkpointMarks != null) {
-
-      // remove old checkpoints including the current one
-      Iterator<Long> iterator = pendingCheckpoints.keySet().iterator();
-      long currentId;
-      do {
-        currentId = iterator.next();
-        iterator.remove();
-      } while (currentId != checkpointId);
-
-      // confirm all marks
-      for (CheckpointMarkT mark : checkpointMarks) {
-        mark.finalizeCheckpoint();
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
deleted file mode 100644
index b431ce7..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
deleted file mode 100644
index 0674871..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
deleted file mode 100644
index 3203446..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ /dev/null
@@ -1,865 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.MapCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MapState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.SetState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-
-/**
- * {@link StateInternals} that uses a Flink {@link DefaultOperatorStateBackend}
- * to manage the broadcast state.
- * The state is the same on all parallel instances of the operator.
- * So we just need store state of operator-0 in OperatorStateBackend.
- *
- * <p>Note: Ignore index of key.
- * Mainly for SideInputs.
- */
-public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
-
-  private int indexInSubtaskGroup;
-  private final DefaultOperatorStateBackend stateBackend;
-  // stateName -> <namespace, state>
-  private Map<String, Map<String, ?>> stateForNonZeroOperator;
-
-  public FlinkBroadcastStateInternals(int indexInSubtaskGroup, OperatorStateBackend stateBackend) {
-    //TODO flink do not yet expose through public API
-    this.stateBackend = (DefaultOperatorStateBackend) stateBackend;
-    this.indexInSubtaskGroup = indexInSubtaskGroup;
-    if (indexInSubtaskGroup != 0) {
-      stateForNonZeroOperator = new HashMap<>();
-    }
-  }
-
-  @Override
-  public K getKey() {
-    return null;
-  }
-
-  @Override
-  public <T extends State> T state(
-      final StateNamespace namespace,
-      StateTag<? super K, T> address) {
-
-    return state(namespace, address, StateContexts.nullContext());
-  }
-
-  @Override
-  public <T extends State> T state(
-      final StateNamespace namespace,
-      StateTag<? super K, T> address,
-      final StateContext<?> context) {
-
-    return address.bind(new StateTag.StateBinder<K>() {
-
-      @Override
-      public <T> ValueState<T> bindValue(
-          StateTag<? super K, ValueState<T>> address,
-          Coder<T> coder) {
-
-        return new FlinkBroadcastValueState<>(stateBackend, address, namespace, coder);
-      }
-
-      @Override
-      public <T> BagState<T> bindBag(
-          StateTag<? super K, BagState<T>> address,
-          Coder<T> elemCoder) {
-
-        return new FlinkBroadcastBagState<>(stateBackend, address, namespace, elemCoder);
-      }
-
-      @Override
-      public <T> SetState<T> bindSet(
-          StateTag<? super K, SetState<T>> address,
-          Coder<T> elemCoder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", SetState.class.getSimpleName()));
-      }
-
-      @Override
-      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-          StateTag<? super K, MapState<KeyT, ValueT>> spec,
-          Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", MapState.class.getSimpleName()));
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT>
-      bindCombiningValue(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-
-        return new FlinkCombiningState<>(
-            stateBackend, address, combineFn, namespace, accumCoder);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkKeyedCombiningState<>(
-            stateBackend,
-            address,
-            combineFn,
-            namespace,
-            accumCoder,
-            FlinkBroadcastStateInternals.this);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          CombineWithContext.KeyedCombineFnWithContext<
-              ? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkCombiningStateWithContext<>(
-            stateBackend,
-            address,
-            combineFn,
-            namespace,
-            accumCoder,
-            FlinkBroadcastStateInternals.this,
-            CombineContextFactory.createFromStateContext(context));
-      }
-
-      @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          StateTag<? super K, WatermarkHoldState<W>> address,
-          OutputTimeFn<? super W> outputTimeFn) {
-         throw new UnsupportedOperationException(
-             String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
-      }
-    });
-  }
-
-  /**
-   * 1. The way we would use it is to only checkpoint anything from the operator
-   * with subtask index 0 because we assume that the state is the same on all
-   * parallel instances of the operator.
-   *
-   * <p>2. Use map to support namespace.
-   */
-  private abstract class AbstractBroadcastState<T> {
-
-    private String name;
-    private final StateNamespace namespace;
-    private final ListStateDescriptor<Map<String, T>> flinkStateDescriptor;
-    private final DefaultOperatorStateBackend flinkStateBackend;
-
-    AbstractBroadcastState(
-        DefaultOperatorStateBackend flinkStateBackend,
-        String name,
-        StateNamespace namespace,
-        Coder<T> coder) {
-      this.name = name;
-
-      this.namespace = namespace;
-      this.flinkStateBackend = flinkStateBackend;
-
-      CoderTypeInformation<Map<String, T>> typeInfo =
-          new CoderTypeInformation<>(MapCoder.of(StringUtf8Coder.of(), coder));
-
-      flinkStateDescriptor = new ListStateDescriptor<>(name,
-          typeInfo.createSerializer(new ExecutionConfig()));
-    }
-
-    /**
-     * Get map(namespce->T) from index 0.
-     */
-    Map<String, T> getMap() throws Exception {
-      if (indexInSubtaskGroup == 0) {
-        return getMapFromBroadcastState();
-      } else {
-        Map<String, T> result = (Map<String, T>) stateForNonZeroOperator.get(name);
-        // maybe restore from BroadcastState of Operator-0
-        if (result == null) {
-          result = getMapFromBroadcastState();
-          if (result != null) {
-            stateForNonZeroOperator.put(name, result);
-            // we don't need it anymore, must clear it.
-            flinkStateBackend.getBroadcastOperatorState(
-                flinkStateDescriptor).clear();
-          }
-        }
-        return result;
-      }
-    }
-
-    Map<String, T> getMapFromBroadcastState() throws Exception {
-      ListState<Map<String, T>> state = flinkStateBackend.getBroadcastOperatorState(
-          flinkStateDescriptor);
-      Iterable<Map<String, T>> iterable = state.get();
-      Map<String, T> ret = null;
-      if (iterable != null) {
-        // just use index 0
-        Iterator<Map<String, T>> iterator = iterable.iterator();
-        if (iterator.hasNext()) {
-          ret = iterator.next();
-        }
-      }
-      return ret;
-    }
-
-    /**
-     * Update map(namespce->T) from index 0.
-     */
-    void updateMap(Map<String, T> map) throws Exception {
-      if (indexInSubtaskGroup == 0) {
-        ListState<Map<String, T>> state = flinkStateBackend.getBroadcastOperatorState(
-            flinkStateDescriptor);
-        state.clear();
-        if (map.size() > 0) {
-          state.add(map);
-        }
-      } else {
-        if (map.size() == 0) {
-          stateForNonZeroOperator.remove(name);
-          // updateMap is always behind getMap,
-          // getMap will clear map in BroadcastOperatorState,
-          // we don't need clear here.
-        } else {
-          stateForNonZeroOperator.put(name, map);
-        }
-      }
-    }
-
-    void writeInternal(T input) {
-      try {
-        Map<String, T> map = getMap();
-        if (map == null) {
-          map = new HashMap<>();
-        }
-        map.put(namespace.stringKey(), input);
-        updateMap(map);
-      } catch (Exception e) {
-        throw new RuntimeException("Error updating state.", e);
-      }
-    }
-
-    T readInternal() {
-      try {
-        Map<String, T> map = getMap();
-        if (map == null) {
-          return null;
-        } else {
-          return map.get(namespace.stringKey());
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    void clearInternal() {
-      try {
-        Map<String, T> map = getMap();
-        if (map != null) {
-          map.remove(namespace.stringKey());
-          updateMap(map);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-  }
-
-  private class FlinkBroadcastValueState<K, T>
-      extends AbstractBroadcastState<T> implements ValueState<T> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, ValueState<T>> address;
-
-    FlinkBroadcastValueState(
-        DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, ValueState<T>> address,
-        StateNamespace namespace,
-        Coder<T> coder) {
-      super(flinkStateBackend, address.getId(), namespace, coder);
-
-      this.namespace = namespace;
-      this.address = address;
-
-    }
-
-    @Override
-    public void write(T input) {
-      writeInternal(input);
-    }
-
-    @Override
-    public ValueState<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public T read() {
-      return readInternal();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkBroadcastValueState<?, ?> that = (FlinkBroadcastValueState<?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-
-    @Override
-    public void clear() {
-      clearInternal();
-    }
-  }
-
-  private class FlinkBroadcastBagState<K, T> extends AbstractBroadcastState<List<T>>
-      implements BagState<T> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
-
-    FlinkBroadcastBagState(
-        DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, BagState<T>> address,
-        StateNamespace namespace,
-        Coder<T> coder) {
-      super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder));
-
-      this.namespace = namespace;
-      this.address = address;
-    }
-
-    @Override
-    public void add(T input) {
-      List<T> list = readInternal();
-      if (list == null) {
-        list = new ArrayList<>();
-      }
-      list.add(input);
-      writeInternal(list);
-    }
-
-    @Override
-    public BagState<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public Iterable<T> read() {
-      List<T> result = readInternal();
-      return result != null ? result : Collections.<T>emptyList();
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            List<T> result = readInternal();
-            return result == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      clearInternal();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkBroadcastBagState<?, ?> that = (FlinkBroadcastBagState<?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private class FlinkCombiningState<K, InputT, AccumT, OutputT>
-      extends AbstractBroadcastState<AccumT>
-      implements CombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
-    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-
-    FlinkCombiningState(
-        DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-        Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder) {
-      super(flinkStateBackend, address.getId(), namespace, accumCoder);
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      AccumT current = readInternal();
-      if (current == null) {
-        current = combineFn.createAccumulator();
-      }
-      current = combineFn.addInput(current, value);
-      writeInternal(current);
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      AccumT current = readInternal();
-
-      if (current == null) {
-        writeInternal(accum);
-      } else {
-        current = combineFn.mergeAccumulators(Arrays.asList(current, accum));
-        writeInternal(current);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      return readInternal();
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(accumulators);
-    }
-
-    @Override
-    public OutputT read() {
-      AccumT accum = readInternal();
-      if (accum != null) {
-        return combineFn.extractOutput(accum);
-      } else {
-        return combineFn.extractOutput(combineFn.createAccumulator());
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return readInternal() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      clearInternal();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkCombiningState<?, ?, ?, ?> that =
-          (FlinkCombiningState<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
-      extends AbstractBroadcastState<AccumT>
-      implements CombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
-    private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
-    private final FlinkBroadcastStateInternals<K> flinkStateInternals;
-
-    FlinkKeyedCombiningState(
-        DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-        Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder,
-        FlinkBroadcastStateInternals<K> flinkStateInternals) {
-      super(flinkStateBackend, address.getId(), namespace, accumCoder);
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-      this.flinkStateInternals = flinkStateInternals;
-
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        AccumT current = readInternal();
-        if (current == null) {
-          current = combineFn.createAccumulator(flinkStateInternals.getKey());
-        }
-        current = combineFn.addInput(flinkStateInternals.getKey(), current, value);
-        writeInternal(current);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state." , e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-        AccumT current = readInternal();
-        if (current == null) {
-          writeInternal(accum);
-        } else {
-          current = combineFn.mergeAccumulators(
-              flinkStateInternals.getKey(),
-              Arrays.asList(current, accum));
-          writeInternal(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        return readInternal();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        AccumT accum = readInternal();
-        if (accum != null) {
-          return combineFn.extractOutput(flinkStateInternals.getKey(), accum);
-        } else {
-          return combineFn.extractOutput(
-              flinkStateInternals.getKey(),
-              combineFn.createAccumulator(flinkStateInternals.getKey()));
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return readInternal() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      clearInternal();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkKeyedCombiningState<?, ?, ?, ?> that =
-          (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
-      extends AbstractBroadcastState<AccumT>
-      implements CombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
-    private final CombineWithContext.KeyedCombineFnWithContext<
-        ? super K, InputT, AccumT, OutputT> combineFn;
-    private final FlinkBroadcastStateInternals<K> flinkStateInternals;
-    private final CombineWithContext.Context context;
-
-    FlinkCombiningStateWithContext(
-        DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-        CombineWithContext.KeyedCombineFnWithContext<
-            ? super K, InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder,
-        FlinkBroadcastStateInternals<K> flinkStateInternals,
-        CombineWithContext.Context context) {
-      super(flinkStateBackend, address.getId(), namespace, accumCoder);
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-      this.flinkStateInternals = flinkStateInternals;
-      this.context = context;
-
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        AccumT current = readInternal();
-        if (current == null) {
-          current = combineFn.createAccumulator(flinkStateInternals.getKey(), context);
-        }
-        current = combineFn.addInput(flinkStateInternals.getKey(), current, value, context);
-        writeInternal(current);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state." , e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-
-        AccumT current = readInternal();
-        if (current == null) {
-          writeInternal(accum);
-        } else {
-          current = combineFn.mergeAccumulators(
-              flinkStateInternals.getKey(),
-              Arrays.asList(current, accum),
-              context);
-          writeInternal(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        return readInternal();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators, context);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        AccumT accum = readInternal();
-        return combineFn.extractOutput(flinkStateInternals.getKey(), accum, context);
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return readInternal() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      clearInternal();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkCombiningStateWithContext<?, ?, ?, ?> that =
-          (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-}