You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:53:02 UTC

[18/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples package

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
deleted file mode 100644
index 24b340e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ /dev/null
@@ -1,487 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MapState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.SetState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.KeyGroupsList;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-/**
- * {@link StateInternals} that uses {@link KeyGroupCheckpointedOperator}
- * to checkpoint state.
- *
- * <p>Note:
- * Ignore index of key.
- * Just implement BagState.
- *
- * <p>Reference from {@link HeapInternalTimerService} to the local key-group range.
- */
-public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
-
-  private final Coder<K> keyCoder;
-  private final KeyGroupsList localKeyGroupRange;
-  private KeyedStateBackend keyedStateBackend;
-  private final int localKeyGroupRangeStartIdx;
-
-  // stateName -> namespace -> (valueCoder, value)
-  private final Map<String, Tuple2<Coder<?>, Map<String, ?>>>[] stateTables;
-
-  public FlinkKeyGroupStateInternals(
-      Coder<K> keyCoder,
-      KeyedStateBackend keyedStateBackend) {
-    this.keyCoder = keyCoder;
-    this.keyedStateBackend = keyedStateBackend;
-    this.localKeyGroupRange = keyedStateBackend.getKeyGroupRange();
-    // find the starting index of the local key-group range
-    int startIdx = Integer.MAX_VALUE;
-    for (Integer keyGroupIdx : localKeyGroupRange) {
-      startIdx = Math.min(keyGroupIdx, startIdx);
-    }
-    this.localKeyGroupRangeStartIdx = startIdx;
-    stateTables = (Map<String, Tuple2<Coder<?>, Map<String, ?>>>[])
-        new Map[localKeyGroupRange.getNumberOfKeyGroups()];
-    for (int i = 0; i < stateTables.length; i++) {
-      stateTables[i] = new HashMap<>();
-    }
-  }
-
-  @Override
-  public K getKey() {
-    ByteBuffer keyBytes = (ByteBuffer) keyedStateBackend.getCurrentKey();
-    try {
-      return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
-    } catch (CoderException e) {
-      throw new RuntimeException("Error decoding key.", e);
-    }
-  }
-
-  @Override
-  public <T extends State> T state(
-      final StateNamespace namespace,
-      StateTag<? super K, T> address) {
-
-    return state(namespace, address, StateContexts.nullContext());
-  }
-
-  @Override
-  public <T extends State> T state(
-      final StateNamespace namespace,
-      StateTag<? super K, T> address,
-      final StateContext<?> context) {
-
-    return address.bind(new StateTag.StateBinder<K>() {
-
-      @Override
-      public <T> ValueState<T> bindValue(
-          StateTag<? super K, ValueState<T>> address,
-          Coder<T> coder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", ValueState.class.getSimpleName()));
-      }
-
-      @Override
-      public <T> BagState<T> bindBag(
-          StateTag<? super K, BagState<T>> address,
-          Coder<T> elemCoder) {
-
-        return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
-      }
-
-      @Override
-      public <T> SetState<T> bindSet(
-          StateTag<? super K, SetState<T>> address,
-          Coder<T> elemCoder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", SetState.class.getSimpleName()));
-      }
-
-      @Override
-      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-          StateTag<? super K, MapState<KeyT, ValueT>> spec,
-          Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", MapState.class.getSimpleName()));
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT>
-      bindCombiningValue(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-        throw new UnsupportedOperationException("bindCombiningValue is not supported.");
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-        throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
-
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          CombineWithContext.KeyedCombineFnWithContext<
-              ? super K, InputT, AccumT, OutputT> combineFn) {
-        throw new UnsupportedOperationException(
-            "bindKeyedCombiningValueWithContext is not supported.");
-      }
-
-      @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          StateTag<? super K, WatermarkHoldState<W>> address,
-          OutputTimeFn<? super W> outputTimeFn) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", CombiningState.class.getSimpleName()));
-      }
-    });
-  }
-
-  /**
-   * Reference from {@link Combine.CombineFn}.
-   *
-   * <p>Accumulators are stored in each KeyGroup, call addInput() when a element comes,
-   * call extractOutput() to produce the desired value when need to read data.
-   */
-  interface KeyGroupCombiner<InputT, AccumT, OutputT> {
-
-    /**
-     * Returns a new, mutable accumulator value, representing the accumulation
-     * of zero input values.
-     */
-    AccumT createAccumulator();
-
-    /**
-     * Adds the given input value to the given accumulator, returning the
-     * new accumulator value.
-     */
-    AccumT addInput(AccumT accumulator, InputT input);
-
-    /**
-     * Returns the output value that is the result of all accumulators from KeyGroups
-     * that are assigned to this operator.
-     */
-    OutputT extractOutput(Iterable<AccumT> accumulators);
-  }
-
-  private abstract class AbstractKeyGroupState<InputT, AccumT, OutputT> {
-
-    private String stateName;
-    private String namespace;
-    private Coder<AccumT> coder;
-    private KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner;
-
-    AbstractKeyGroupState(
-        String stateName,
-        String namespace,
-        Coder<AccumT> coder,
-        KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner) {
-      this.stateName = stateName;
-      this.namespace = namespace;
-      this.coder = coder;
-      this.keyGroupCombiner = keyGroupCombiner;
-    }
-
-    /**
-     * Choose keyGroup of input and addInput to accumulator.
-     */
-    void addInput(InputT input) {
-      int keyGroupIdx = keyedStateBackend.getCurrentKeyGroupIndex();
-      int localIdx = getIndexForKeyGroup(keyGroupIdx);
-      Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx];
-      Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
-      if (tuple2 == null) {
-        tuple2 = new Tuple2<>();
-        tuple2.f0 = coder;
-        tuple2.f1 = new HashMap<>();
-        stateTable.put(stateName, tuple2);
-      }
-      Map<String, AccumT> map = (Map<String, AccumT>) tuple2.f1;
-      AccumT accumulator = map.get(namespace);
-      if (accumulator == null) {
-        accumulator = keyGroupCombiner.createAccumulator();
-      }
-      accumulator = keyGroupCombiner.addInput(accumulator, input);
-      map.put(namespace, accumulator);
-    }
-
-    /**
-     * Get all accumulators and invoke extractOutput().
-     */
-    OutputT extractOutput() {
-      List<AccumT> accumulators = new ArrayList<>(stateTables.length);
-      for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : stateTables) {
-        Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
-        if (tuple2 != null) {
-          AccumT accumulator = (AccumT) tuple2.f1.get(namespace);
-          if (accumulator != null) {
-            accumulators.add(accumulator);
-          }
-        }
-      }
-      return keyGroupCombiner.extractOutput(accumulators);
-    }
-
-    /**
-     * Find the first accumulator and return immediately.
-     */
-    boolean isEmptyInternal() {
-      for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : stateTables) {
-        Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
-        if (tuple2 != null) {
-          AccumT accumulator = (AccumT) tuple2.f1.get(namespace);
-          if (accumulator != null) {
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-
-    /**
-     * Clear accumulators and clean empty map.
-     */
-    void clearInternal() {
-      for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : stateTables) {
-        Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
-        if (tuple2 != null) {
-          tuple2.f1.remove(namespace);
-          if (tuple2.f1.size() == 0) {
-            stateTable.remove(stateName);
-          }
-        }
-      }
-    }
-
-  }
-
-  private int getIndexForKeyGroup(int keyGroupIdx) {
-    checkArgument(localKeyGroupRange.contains(keyGroupIdx),
-        "Key Group " + keyGroupIdx + " does not belong to the local range.");
-    return keyGroupIdx - this.localKeyGroupRangeStartIdx;
-  }
-
-  private class KeyGroupBagCombiner<T> implements KeyGroupCombiner<T, List<T>, Iterable<T>> {
-
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public Iterable<T> extractOutput(Iterable<List<T>> accumulators) {
-      List<T> result = new ArrayList<>();
-      // maybe can return an unmodifiable view.
-      for (List<T> list : accumulators) {
-        result.addAll(list);
-      }
-      return result;
-    }
-  }
-
-  private class FlinkKeyGroupBagState<T> extends AbstractKeyGroupState<T, List<T>, Iterable<T>>
-      implements BagState<T> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
-
-    FlinkKeyGroupBagState(
-        StateTag<? super K, BagState<T>> address,
-        StateNamespace namespace,
-        Coder<T> coder) {
-      super(address.getId(), namespace.stringKey(), ListCoder.of(coder),
-          new KeyGroupBagCombiner<T>());
-      this.namespace = namespace;
-      this.address = address;
-    }
-
-    @Override
-    public void add(T input) {
-      addInput(input);
-    }
-
-    @Override
-    public BagState<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public Iterable<T> read() {
-      Iterable<T> result = extractOutput();
-      return result != null ? result : Collections.<T>emptyList();
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return isEmptyInternal();
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      clearInternal();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkKeyGroupBagState<?> that = (FlinkKeyGroupBagState<?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  /**
-   * Snapshots the state {@code (stateName -> (valueCoder && (namespace -> value)))} for a given
-   * {@code keyGroupIdx}.
-   *
-   * @param keyGroupIdx the id of the key-group to be put in the snapshot.
-   * @param out the stream to write to.
-   */
-  public void snapshotKeyGroupState(int keyGroupIdx, DataOutputStream out) throws Exception {
-    int localIdx = getIndexForKeyGroup(keyGroupIdx);
-    Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx];
-    Preconditions.checkState(stateTable.size() <= Short.MAX_VALUE,
-        "Too many States: " + stateTable.size() + ". Currently at most "
-            + Short.MAX_VALUE + " states are supported");
-    out.writeShort(stateTable.size());
-    for (Map.Entry<String, Tuple2<Coder<?>, Map<String, ?>>> entry : stateTable.entrySet()) {
-      out.writeUTF(entry.getKey());
-      Coder coder = entry.getValue().f0;
-      InstantiationUtil.serializeObject(out, coder);
-      Map<String, ?> map = entry.getValue().f1;
-      out.writeInt(map.size());
-      for (Map.Entry<String, ?> entry1 : map.entrySet()) {
-        StringUtf8Coder.of().encode(entry1.getKey(), out, Context.NESTED);
-        coder.encode(entry1.getValue(), out, Context.NESTED);
-      }
-    }
-  }
-
-  /**
-   * Restore the state {@code (stateName -> (valueCoder && (namespace -> value)))}
-   * for a given {@code keyGroupIdx}.
-   *
-   * @param keyGroupIdx the id of the key-group to be put in the snapshot.
-   * @param in the stream to read from.
-   * @param userCodeClassLoader the class loader that will be used to deserialize
-   *                            the valueCoder.
-   */
-  public void restoreKeyGroupState(int keyGroupIdx, DataInputStream in,
-                                   ClassLoader userCodeClassLoader) throws Exception {
-    int localIdx = getIndexForKeyGroup(keyGroupIdx);
-    Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx];
-    int numStates = in.readShort();
-    for (int i = 0; i < numStates; ++i) {
-      String stateName = in.readUTF();
-      Coder coder = InstantiationUtil.deserializeObject(in, userCodeClassLoader);
-      Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
-      if (tuple2 == null) {
-        tuple2 = new Tuple2<>();
-        tuple2.f0 = coder;
-        tuple2.f1 = new HashMap<>();
-        stateTable.put(stateName, tuple2);
-      }
-      Map<String, Object> map = (Map<String, Object>) tuple2.f1;
-      int mapSize = in.readInt();
-      for (int j = 0; j < mapSize; j++) {
-        String namespace = StringUtf8Coder.of().decode(in, Context.NESTED);
-        Object value = coder.decode(in, Context.NESTED);
-        map.put(namespace, value);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
deleted file mode 100644
index 2bf0bf1..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import com.google.common.collect.Iterators;
-import java.util.Collections;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MapState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.SetState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-
-/**
- * {@link StateInternals} that uses a Flink {@link OperatorStateBackend}
- * to manage the split-distribute state.
- *
- * <p>Elements in ListState will be redistributed in round robin fashion
- * to operators when restarting with a different parallelism.
- *
- *  <p>Note:
- *  Ignore index of key and namespace.
- *  Just implement BagState.
- */
-public class FlinkSplitStateInternals<K> implements StateInternals<K> {
-
-  private final OperatorStateBackend stateBackend;
-
-  public FlinkSplitStateInternals(OperatorStateBackend stateBackend) {
-    this.stateBackend = stateBackend;
-  }
-
-  @Override
-  public K getKey() {
-    return null;
-  }
-
-  @Override
-  public <T extends State> T state(
-      final StateNamespace namespace,
-      StateTag<? super K, T> address) {
-
-    return state(namespace, address, StateContexts.nullContext());
-  }
-
-  @Override
-  public <T extends State> T state(
-      final StateNamespace namespace,
-      StateTag<? super K, T> address,
-      final StateContext<?> context) {
-
-    return address.bind(new StateTag.StateBinder<K>() {
-
-      @Override
-      public <T> ValueState<T> bindValue(
-          StateTag<? super K, ValueState<T>> address,
-          Coder<T> coder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", ValueState.class.getSimpleName()));
-      }
-
-      @Override
-      public <T> BagState<T> bindBag(
-          StateTag<? super K, BagState<T>> address,
-          Coder<T> elemCoder) {
-
-        return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder);
-      }
-
-      @Override
-      public <T> SetState<T> bindSet(
-          StateTag<? super K, SetState<T>> address,
-          Coder<T> elemCoder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", SetState.class.getSimpleName()));
-      }
-
-      @Override
-      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-          StateTag<? super K, MapState<KeyT, ValueT>> spec,
-          Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", MapState.class.getSimpleName()));
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT>
-      bindCombiningValue(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-        throw new UnsupportedOperationException("bindCombiningValue is not supported.");
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-        throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
-
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          CombineWithContext.KeyedCombineFnWithContext<
-              ? super K, InputT, AccumT, OutputT> combineFn) {
-        throw new UnsupportedOperationException(
-            "bindKeyedCombiningValueWithContext is not supported.");
-      }
-
-      @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          StateTag<? super K, WatermarkHoldState<W>> address,
-          OutputTimeFn<? super W> outputTimeFn) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", CombiningState.class.getSimpleName()));
-      }
-    });
-  }
-
-  private static class FlinkSplitBagState<K, T> implements BagState<T> {
-
-    private final ListStateDescriptor<T> descriptor;
-    private OperatorStateBackend flinkStateBackend;
-    private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
-
-    FlinkSplitBagState(
-        OperatorStateBackend flinkStateBackend,
-        StateTag<? super K, BagState<T>> address,
-        StateNamespace namespace,
-        Coder<T> coder) {
-      this.flinkStateBackend = flinkStateBackend;
-      this.namespace = namespace;
-      this.address = address;
-
-      CoderTypeInformation<T> typeInfo =
-          new CoderTypeInformation<>(coder);
-
-      descriptor = new ListStateDescriptor<>(address.getId(),
-          typeInfo.createSerializer(new ExecutionConfig()));
-    }
-
-    @Override
-    public void add(T input) {
-      try {
-        flinkStateBackend.getOperatorState(descriptor).add(input);
-      } catch (Exception e) {
-        throw new RuntimeException("Error updating state.", e);
-      }
-    }
-
-    @Override
-    public BagState<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public Iterable<T> read() {
-      try {
-        Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get();
-        return result != null ? result : Collections.<T>emptyList();
-      } catch (Exception e) {
-        throw new RuntimeException("Error updating state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get();
-            // PartitionableListState.get() return empty collection When there is no element,
-            // KeyedListState different. (return null)
-            return result == null || Iterators.size(result.iterator()) == 0;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getOperatorState(descriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkSplitBagState<?, ?> that = (FlinkSplitBagState<?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
deleted file mode 100644
index 4f961e5..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ /dev/null
@@ -1,1053 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import com.google.common.collect.Lists;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MapState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.SetState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.joda.time.Instant;
-
-/**
- * {@link StateInternals} that uses a Flink {@link KeyedStateBackend} to manage state.
- *
- * <p>Note: In the Flink streaming runner the key is always encoded
- * using an {@link Coder} and stored in a {@link ByteBuffer}.
- */
-public class FlinkStateInternals<K> implements StateInternals<K> {
-
-  private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-  private Coder<K> keyCoder;
-
-  // on recovery, these will no be properly set because we don't
-  // know which watermark hold states there are in the Flink State Backend
-  private final Map<String, Instant> watermarkHolds = new HashMap<>();
-
-  public FlinkStateInternals(KeyedStateBackend<ByteBuffer> flinkStateBackend, Coder<K> keyCoder) {
-    this.flinkStateBackend = flinkStateBackend;
-    this.keyCoder = keyCoder;
-  }
-
-  /**
-   * Returns the minimum over all watermark holds.
-   */
-  public Instant watermarkHold() {
-    long min = Long.MAX_VALUE;
-    for (Instant hold: watermarkHolds.values()) {
-      min = Math.min(min, hold.getMillis());
-    }
-    return new Instant(min);
-  }
-
-  @Override
-  public K getKey() {
-    ByteBuffer keyBytes = flinkStateBackend.getCurrentKey();
-    try {
-      return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
-    } catch (CoderException e) {
-      throw new RuntimeException("Error decoding key.", e);
-    }
-  }
-
-  @Override
-  public <T extends State> T state(
-      final StateNamespace namespace,
-      StateTag<? super K, T> address) {
-
-    return state(namespace, address, StateContexts.nullContext());
-  }
-
-  @Override
-  public <T extends State> T state(
-      final StateNamespace namespace,
-      StateTag<? super K, T> address,
-      final StateContext<?> context) {
-
-    return address.bind(new StateTag.StateBinder<K>() {
-
-      @Override
-      public <T> ValueState<T> bindValue(
-          StateTag<? super K, ValueState<T>> address,
-          Coder<T> coder) {
-
-        return new FlinkValueState<>(flinkStateBackend, address, namespace, coder);
-      }
-
-      @Override
-      public <T> BagState<T> bindBag(
-          StateTag<? super K, BagState<T>> address,
-          Coder<T> elemCoder) {
-
-        return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder);
-      }
-
-      @Override
-      public <T> SetState<T> bindSet(
-          StateTag<? super K, SetState<T>> address,
-          Coder<T> elemCoder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", SetState.class.getSimpleName()));
-      }
-
-      @Override
-      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-          StateTag<? super K, MapState<KeyT, ValueT>> spec,
-          Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", MapState.class.getSimpleName()));
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT>
-      bindCombiningValue(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-
-        return new FlinkCombiningState<>(
-            flinkStateBackend, address, combineFn, namespace, accumCoder);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkKeyedCombiningState<>(
-            flinkStateBackend,
-            address,
-            combineFn,
-            namespace,
-            accumCoder,
-            FlinkStateInternals.this);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-          Coder<AccumT> accumCoder,
-          CombineWithContext.KeyedCombineFnWithContext<
-              ? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkCombiningStateWithContext<>(
-            flinkStateBackend,
-            address,
-            combineFn,
-            namespace,
-            accumCoder,
-            FlinkStateInternals.this,
-            CombineContextFactory.createFromStateContext(context));
-      }
-
-      @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          StateTag<? super K, WatermarkHoldState<W>> address,
-          OutputTimeFn<? super W> outputTimeFn) {
-
-        return new FlinkWatermarkHoldState<>(
-            flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn);
-      }
-    });
-  }
-
-  private static class FlinkValueState<K, T> implements ValueState<T> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, ValueState<T>> address;
-    private final ValueStateDescriptor<T> flinkStateDescriptor;
-    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-
-    FlinkValueState(
-        KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, ValueState<T>> address,
-        StateNamespace namespace,
-        Coder<T> coder) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.flinkStateBackend = flinkStateBackend;
-
-      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
-    }
-
-    @Override
-    public void write(T input) {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).update(input);
-      } catch (Exception e) {
-        throw new RuntimeException("Error updating state.", e);
-      }
-    }
-
-    @Override
-    public ValueState<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public T read() {
-      try {
-        return flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkValueState<?, ?> that = (FlinkValueState<?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private static class FlinkBagState<K, T> implements BagState<T> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
-    private final ListStateDescriptor<T> flinkStateDescriptor;
-    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-
-    FlinkBagState(
-        KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, BagState<T>> address,
-        StateNamespace namespace,
-        Coder<T> coder) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.flinkStateBackend = flinkStateBackend;
-
-      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
-      flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), typeInfo);
-    }
-
-    @Override
-    public void add(T input) {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).add(input);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to bag state.", e);
-      }
-    }
-
-    @Override
-    public BagState<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public Iterable<T> read() {
-      try {
-        Iterable<T> result = flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).get();
-
-        return result != null ? result : Collections.<T>emptyList();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            Iterable<T> result = flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).get();
-            return result == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkBagState<?, ?> that = (FlinkBagState<?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private static class FlinkCombiningState<K, InputT, AccumT, OutputT>
-      implements CombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
-    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
-    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-
-    FlinkCombiningState(
-        KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-        Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-      this.flinkStateBackend = flinkStateBackend;
-
-      CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          current = combineFn.createAccumulator();
-        }
-        current = combineFn.addInput(current, value);
-        state.update(current);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state." , e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-              namespace.stringKey(),
-              StringSerializer.INSTANCE,
-              flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          state.update(accum);
-        } else {
-          current = combineFn.mergeAccumulators(Lists.newArrayList(current, accum));
-          state.update(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        return flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(accumulators);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT accum = state.value();
-        if (accum != null) {
-          return combineFn.extractOutput(accum);
-        } else {
-          return combineFn.extractOutput(combineFn.createAccumulator());
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).value() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkCombiningState<?, ?, ?, ?> that =
-          (FlinkCombiningState<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
-      implements CombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
-    private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
-    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
-    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-    private final FlinkStateInternals<K> flinkStateInternals;
-
-    FlinkKeyedCombiningState(
-        KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-        Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder,
-        FlinkStateInternals<K> flinkStateInternals) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-      this.flinkStateBackend = flinkStateBackend;
-      this.flinkStateInternals = flinkStateInternals;
-
-      CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          current = combineFn.createAccumulator(flinkStateInternals.getKey());
-        }
-        current = combineFn.addInput(flinkStateInternals.getKey(), current, value);
-        state.update(current);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state." , e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          state.update(accum);
-        } else {
-          current = combineFn.mergeAccumulators(
-              flinkStateInternals.getKey(),
-              Lists.newArrayList(current, accum));
-          state.update(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        return flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT accum = state.value();
-        if (accum != null) {
-          return combineFn.extractOutput(flinkStateInternals.getKey(), accum);
-        } else {
-          return combineFn.extractOutput(
-              flinkStateInternals.getKey(),
-              combineFn.createAccumulator(flinkStateInternals.getKey()));
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).value() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkKeyedCombiningState<?, ?, ?, ?> that =
-          (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
-      implements CombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
-    private final CombineWithContext.KeyedCombineFnWithContext<
-        ? super K, InputT, AccumT, OutputT> combineFn;
-    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
-    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-    private final FlinkStateInternals<K> flinkStateInternals;
-    private final CombineWithContext.Context context;
-
-    FlinkCombiningStateWithContext(
-        KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-        CombineWithContext.KeyedCombineFnWithContext<
-            ? super K, InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder,
-        FlinkStateInternals<K> flinkStateInternals,
-        CombineWithContext.Context context) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-      this.flinkStateBackend = flinkStateBackend;
-      this.flinkStateInternals = flinkStateInternals;
-      this.context = context;
-
-      CoderTypeInformation<AccumT> typeInfo = new CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          current = combineFn.createAccumulator(flinkStateInternals.getKey(), context);
-        }
-        current = combineFn.addInput(flinkStateInternals.getKey(), current, value, context);
-        state.update(current);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state." , e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          state.update(accum);
-        } else {
-          current = combineFn.mergeAccumulators(
-              flinkStateInternals.getKey(),
-              Lists.newArrayList(current, accum),
-              context);
-          state.update(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        return flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators, context);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT accum = state.value();
-        return combineFn.extractOutput(flinkStateInternals.getKey(), accum, context);
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).value() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkCombiningStateWithContext<?, ?, ?, ?> that =
-          (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
-      implements WatermarkHoldState<W> {
-    private final StateTag<? super K, WatermarkHoldState<W>> address;
-    private final OutputTimeFn<? super W> outputTimeFn;
-    private final StateNamespace namespace;
-    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-    private final FlinkStateInternals<K> flinkStateInternals;
-    private final ValueStateDescriptor<Instant> flinkStateDescriptor;
-
-    public FlinkWatermarkHoldState(
-        KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        FlinkStateInternals<K> flinkStateInternals,
-        StateTag<? super K, WatermarkHoldState<W>> address,
-        StateNamespace namespace,
-        OutputTimeFn<? super W> outputTimeFn) {
-      this.address = address;
-      this.outputTimeFn = outputTimeFn;
-      this.namespace = namespace;
-      this.flinkStateBackend = flinkStateBackend;
-      this.flinkStateInternals = flinkStateInternals;
-
-      CoderTypeInformation<Instant> typeInfo = new CoderTypeInformation<>(InstantCoder.of());
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), typeInfo, null);
-    }
-
-    @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
-    }
-
-    @Override
-    public WatermarkHoldState<W> readLater() {
-      return this;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).value() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-
-    }
-
-    @Override
-    public void add(Instant value) {
-      try {
-        org.apache.flink.api.common.state.ValueState<Instant> state =
-            flinkStateBackend.getPartitionedState(
-              namespace.stringKey(),
-              StringSerializer.INSTANCE,
-              flinkStateDescriptor);
-
-        Instant current = state.value();
-        if (current == null) {
-          state.update(value);
-          flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
-        } else {
-          Instant combined = outputTimeFn.combine(current, value);
-          state.update(combined);
-          flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error updating state.", e);
-      }
-    }
-
-    @Override
-    public Instant read() {
-      try {
-        org.apache.flink.api.common.state.ValueState<Instant> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-        return state.value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public void clear() {
-      flinkStateInternals.watermarkHolds.remove(namespace.stringKey());
-      try {
-        org.apache.flink.api.common.state.ValueState<Instant> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-        state.clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkWatermarkHoldState<?, ?> that = (FlinkWatermarkHoldState<?, ?>) o;
-
-      if (!address.equals(that.address)) {
-        return false;
-      }
-      if (!outputTimeFn.equals(that.outputTimeFn)) {
-        return false;
-      }
-      return namespace.equals(that.namespace);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = address.hashCode();
-      result = 31 * result + outputTimeFn.hashCode();
-      result = 31 * result + namespace.hashCode();
-      return result;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
deleted file mode 100644
index b38a520..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import java.io.DataOutputStream;
-
-/**
- * This interface is used to checkpoint key-groups state.
- */
-public interface KeyGroupCheckpointedOperator extends KeyGroupRestoringOperator{
-  /**
-   * Snapshots the state for a given {@code keyGroupIdx}.
-   *
-   * <p>AbstractStreamOperator would call this hook in
-   * AbstractStreamOperator.snapshotState() while iterating over the key groups.
-   * @param keyGroupIndex the id of the key-group to be put in the snapshot.
-   * @param out the stream to write to.
-   */
-  void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
deleted file mode 100644
index 2bdfc6e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import java.io.DataInputStream;
-
-/**
- * This interface is used to restore key-groups state.
- */
-public interface KeyGroupRestoringOperator {
-  /**
-   * Restore the state for a given {@code keyGroupIndex}.
-   * @param keyGroupIndex the id of the key-group to be put in the snapshot.
-   * @param in the stream to read from.
-   */
-  void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
deleted file mode 100644
index 0004e9e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal state implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/resources/log4j.properties b/runners/flink/runner/src/main/resources/log4j.properties
deleted file mode 100644
index 4b6a708..0000000
--- a/runners/flink/runner/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF,console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
deleted file mode 100644
index 10d6d9d..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.runners.flink.translation.types.EncodedValueComparator;
-import org.apache.beam.runners.flink.translation.types.EncodedValueTypeInformation;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.junit.Assert;
-
-/**
- * Test for {@link EncodedValueComparator}.
- */
-public class EncodedValueComparatorTest extends ComparatorTestBase<byte[]> {
-
-  @Override
-  protected TypeComparator<byte[]> createComparator(boolean ascending) {
-    return new EncodedValueTypeInformation().createComparator(ascending, new ExecutionConfig());
-  }
-
-  @Override
-  protected TypeSerializer<byte[]> createSerializer() {
-    return new EncodedValueTypeInformation().createSerializer(new ExecutionConfig());
-  }
-
-  @Override
-  protected void deepEquals(String message, byte[] should, byte[] is) {
-    Assert.assertArrayEquals(message, should, is);
-  }
-
-  @Override
-  protected byte[][] getSortedTestData() {
-    StringUtf8Coder coder = StringUtf8Coder.of();
-
-    try {
-      return new byte[][]{
-          CoderUtils.encodeToByteArray(coder, ""),
-          CoderUtils.encodeToByteArray(coder, "Lorem Ipsum Dolor Omit Longer"),
-          CoderUtils.encodeToByteArray(coder, "aaaa"),
-          CoderUtils.encodeToByteArray(coder, "abcd"),
-          CoderUtils.encodeToByteArray(coder, "abce"),
-          CoderUtils.encodeToByteArray(coder, "abdd"),
-          CoderUtils.encodeToByteArray(coder, "accd"),
-          CoderUtils.encodeToByteArray(coder, "bbcd")
-      };
-    } catch (CoderException e) {
-      throw new RuntimeException("Could not encode values.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
deleted file mode 100644
index d9d174c..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Test;
-
-/**
- * Tests the proper registration of the Flink runner.
- */
-public class FlinkRunnerRegistrarTest {
-
-  @Test
-  public void testFullName() {
-    String[] args =
-        new String[] {String.format("--runner=%s", FlinkRunner.class.getName())};
-    PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
-    assertEquals(opts.getRunner(), FlinkRunner.class);
-  }
-
-  @Test
-  public void testClassName() {
-    String[] args =
-        new String[] {String.format("--runner=%s", FlinkRunner.class.getSimpleName())};
-    PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
-    assertEquals(opts.getRunner(), FlinkRunner.class);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
deleted file mode 100644
index d6240c4..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-
-/**
- * {@link org.apache.beam.sdk.Pipeline} for testing Dataflow programs on the
- * {@link FlinkRunner}.
- */
-public class FlinkTestPipeline extends Pipeline {
-
-  /**
-   * Creates and returns a new test pipeline for batch execution.
-   *
-   * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
-   * {@link Pipeline#run} to execute the pipeline and check the tests.
-   */
-  public static FlinkTestPipeline createForBatch() {
-    return create(false);
-  }
-
-  /**
-   * Creates and returns a new test pipeline for streaming execution.
-   *
-   * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
-   * {@link Pipeline#run} to execute the pipeline and check the tests.
-   *
-   * @return The Test Pipeline
-   */
-  public static FlinkTestPipeline createForStreaming() {
-    return create(true);
-  }
-
-  /**
-   * Creates and returns a new test pipeline for streaming or batch execution.
-   *
-   * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
-   * {@link Pipeline#run} to execute the pipeline and check the tests.
-   *
-   * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch.
-   * @return The Test Pipeline.
-   */
-  private static FlinkTestPipeline create(boolean streaming) {
-    TestFlinkRunner flinkRunner = TestFlinkRunner.create(streaming);
-    return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
-  }
-
-  private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
-              PipelineOptions options) {
-    super(runner, options);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
deleted file mode 100644
index 06187f6..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Collections;
-import java.util.HashMap;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.joda.time.Instant;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Tests for serialization and deserialization of {@link PipelineOptions} in {@link DoFnOperator}.
- */
-public class PipelineOptionsTest {
-
-  /**
-   * Pipeline options.
-   */
-  public interface MyOptions extends FlinkPipelineOptions {
-    @Description("Bla bla bla")
-    @Default.String("Hello")
-    String getTestOption();
-    void setTestOption(String value);
-  }
-
-  private static MyOptions options;
-  private static SerializedPipelineOptions serializedOptions;
-
-  private static final String[] args = new String[]{"--testOption=nothing"};
-
-  @BeforeClass
-  public static void beforeTest() {
-    options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
-    serializedOptions = new SerializedPipelineOptions(options);
-  }
-
-  @Test
-  public void testDeserialization() {
-    MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
-    assertEquals("nothing", deserializedOptions.getTestOption());
-  }
-
-  @Test
-  public void testIgnoredFieldSerialization() {
-    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-    options.setStateBackend(new MemoryStateBackend());
-
-    FlinkPipelineOptions deserialized =
-        new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class);
-
-    assertNull(deserialized.getStateBackend());
-  }
-
-  @Test
-  public void testCaching() {
-    PipelineOptions deserializedOptions =
-        serializedOptions.getPipelineOptions().as(PipelineOptions.class);
-
-    assertNotNull(deserializedOptions);
-    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
-    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
-    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
-  }
-
-  @Test(expected = Exception.class)
-  public void testNonNull() {
-    new SerializedPipelineOptions(null);
-  }
-
-  @Test(expected = Exception.class)
-  public void parDoBaseClassPipelineOptionsNullTest() {
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
-        new TestDoFn(),
-        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
-        new TupleTag<String>("main-output"),
-        Collections.<TupleTag<?>>emptyList(),
-        new DoFnOperator.DefaultOutputManagerFactory<String>(),
-        WindowingStrategy.globalDefault(),
-        new HashMap<Integer, PCollectionView<?>>(),
-        Collections.<PCollectionView<?>>emptyList(),
-        null,
-        null);
-
-  }
-
-  /**
-   * Tests that PipelineOptions are present after serialization.
-   */
-  @Test
-  public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
-
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
-        new TestDoFn(),
-        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
-        new TupleTag<String>("main-output"),
-        Collections.<TupleTag<?>>emptyList(),
-        new DoFnOperator.DefaultOutputManagerFactory<String>(),
-        WindowingStrategy.globalDefault(),
-        new HashMap<Integer, PCollectionView<?>>(),
-        Collections.<PCollectionView<?>>emptyList(),
-        options,
-        null);
-
-    final byte[] serialized = SerializationUtils.serialize(doFnOperator);
-
-    @SuppressWarnings("unchecked")
-    DoFnOperator<Object, Object, Object> deserialized =
-        (DoFnOperator<Object, Object, Object>) SerializationUtils.deserialize(serialized);
-
-    TypeInformation<WindowedValue<Object>> typeInformation = TypeInformation.of(
-        new TypeHint<WindowedValue<Object>>() {});
-
-    OneInputStreamOperatorTestHarness<WindowedValue<Object>, Object> testHarness =
-        new OneInputStreamOperatorTestHarness<>(deserialized,
-            typeInformation.createSerializer(new ExecutionConfig()));
-
-    testHarness.open();
-
-    // execute once to access options
-    testHarness.processElement(new StreamRecord<>(
-        WindowedValue.of(
-            new Object(),
-            Instant.now(),
-            GlobalWindow.INSTANCE,
-            PaneInfo.NO_FIRING)));
-
-    testHarness.close();
-
-  }
-
-
-  private static class TestDoFn extends DoFn<String, String> {
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      Assert.assertNotNull(c.getPipelineOptions());
-      Assert.assertEquals(
-          options.getTestOption(),
-          c.getPipelineOptions().as(MyOptions.class).getTestOption());
-    }
-  }
-}