You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/24 19:52:44 UTC

[06/17] incubator-beam git commit: [BEAM-253] Unify Flink-Streaming Operator Wrappers

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
deleted file mode 100644
index e6a43dc..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ /dev/null
@@ -1,733 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateTable;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.flink.util.InstantiationUtil;
-import org.joda.time.Instant;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * An implementation of the Beam {@link StateInternals}. This implementation simply keeps elements in memory.
- * This state is periodically checkpointed by Flink, for fault-tolerance.
- *
- * TODO: State should be rewritten to redirect to Flink per-key state so that coders and combiners don't need
- * to be serialized along with encoded values when snapshotting.
- */
-public class FlinkStateInternals<K> implements StateInternals<K> {
-
-  private final K key;
-
-  private final Coder<K> keyCoder;
-
-  private final Coder<? extends BoundedWindow> windowCoder;
-
-  private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
-
-  private Instant watermarkHoldAccessor;
-
-  public FlinkStateInternals(K key,
-                             Coder<K> keyCoder,
-                             Coder<? extends BoundedWindow> windowCoder,
-                             OutputTimeFn<? super BoundedWindow> outputTimeFn) {
-    this.key = key;
-    this.keyCoder = keyCoder;
-    this.windowCoder = windowCoder;
-    this.outputTimeFn = outputTimeFn;
-  }
-
-  public Instant getWatermarkHold() {
-    return watermarkHoldAccessor;
-  }
-
-  /**
-   * This is the interface state has to implement in order for it to be fault tolerant when
-   * executed by the FlinkRunner.
-   */
-  private interface CheckpointableIF {
-
-    boolean shouldPersist();
-
-    void persistState(StateCheckpointWriter checkpointBuilder) throws IOException;
-  }
-
-  protected final StateTable<K> inMemoryState = new StateTable<K>() {
-    @Override
-    protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace, final StateContext<?> c) {
-      return new StateTag.StateBinder<K>() {
-
-        @Override
-        public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
-          return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder);
-        }
-
-        @Override
-        public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
-          return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder);
-        }
-
-        @Override
-        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-          return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
-        }
-
-        @Override
-        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder,
-            Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-          return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
-        }
-
-        @Override
-        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder,
-            CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-          return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
-        }
-
-        @Override
-        public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) {
-          return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, address), outputTimeFn);
-        }
-      };
-    }
-  };
-
-  @Override
-  public K getKey() {
-    return key;
-  }
-
-  @Override
-  public <StateT extends State> StateT state(StateNamespace namespace, StateTag<? super K, StateT> address) {
-    return inMemoryState.get(namespace, address, null);
-  }
-
-  @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
-    return inMemoryState.get(namespace, address, c);
-  }
-
-  public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
-    checkpointBuilder.writeInt(getNoOfElements());
-
-    for (State location : inMemoryState.values()) {
-      if (!(location instanceof CheckpointableIF)) {
-        throw new IllegalStateException(String.format(
-            "%s wasn't created by %s -- unable to persist it",
-            location.getClass().getSimpleName(),
-            getClass().getSimpleName()));
-      }
-      ((CheckpointableIF) location).persistState(checkpointBuilder);
-    }
-  }
-
-  public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loader)
-      throws IOException, ClassNotFoundException {
-
-    // the number of elements to read.
-    int noOfElements = checkpointReader.getInt();
-    for (int i = 0; i < noOfElements; i++) {
-      decodeState(checkpointReader, loader);
-    }
-  }
-
-  /**
-   * We remove the first character which encodes the type of the stateTag ('s' for system
-   * and 'u' for user). For more details check out the source of
-   * {@link StateTags.StateTagBase#getId()}.
-   */
-  private void decodeState(StateCheckpointReader reader, ClassLoader loader)
-      throws IOException, ClassNotFoundException {
-
-    StateType stateItemType = StateType.deserialize(reader);
-    ByteString stateKey = reader.getTag();
-
-    // first decode the namespace and the tagId...
-    String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+");
-    if (namespaceAndTag.length != 2) {
-      throw new IllegalArgumentException("Invalid stateKey " + stateKey.toString() + ".");
-    }
-    StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], windowCoder);
-
-    // ... decide if it is a system or user stateTag...
-    char ownerTag = namespaceAndTag[1].charAt(0);
-    if (ownerTag != 's' && ownerTag != 'u') {
-      throw new RuntimeException("Invalid StateTag name.");
-    }
-    boolean isSystemTag = ownerTag == 's';
-    String tagId = namespaceAndTag[1].substring(1);
-
-    // ...then decode the coder (if there is one)...
-    Coder<?> coder = null;
-    switch (stateItemType) {
-      case VALUE:
-      case LIST:
-      case ACCUMULATOR:
-        ByteString coderBytes = reader.getData();
-        coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader);
-        break;
-      case WATERMARK:
-        break;
-    }
-
-    // ...then decode the combiner function (if there is one)...
-    CombineWithContext.KeyedCombineFnWithContext<? super K, ?, ?, ?> combineFn = null;
-    switch (stateItemType) {
-      case ACCUMULATOR:
-        ByteString combinerBytes = reader.getData();
-        combineFn = InstantiationUtil.deserializeObject(combinerBytes.toByteArray(), loader);
-        break;
-      case VALUE:
-      case LIST:
-      case WATERMARK:
-        break;
-    }
-
-    //... and finally, depending on the type of the state being decoded,
-    // 1) create the adequate stateTag,
-    // 2) create the state container,
-    // 3) restore the actual content.
-    switch (stateItemType) {
-      case VALUE: {
-        StateTag stateTag = StateTags.value(tagId, coder);
-        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
-        @SuppressWarnings("unchecked")
-        FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag, null);
-        value.restoreState(reader);
-        break;
-      }
-      case WATERMARK: {
-        @SuppressWarnings("unchecked")
-        StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = StateTags.watermarkStateInternal(tagId, outputTimeFn);
-        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
-        @SuppressWarnings("unchecked")
-        FlinkWatermarkHoldStateImpl<?> watermark = (FlinkWatermarkHoldStateImpl<?>) inMemoryState.get(namespace, stateTag, null);
-        watermark.restoreState(reader);
-        break;
-      }
-      case LIST: {
-        StateTag stateTag = StateTags.bag(tagId, coder);
-        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
-        FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) inMemoryState.get(namespace, stateTag, null);
-        bag.restoreState(reader);
-        break;
-      }
-      case ACCUMULATOR: {
-        @SuppressWarnings("unchecked")
-        StateTag<K, AccumulatorCombiningState<?, ?, ?>> stateTag = StateTags.keyedCombiningValueWithContext(tagId, (Coder) coder, combineFn);
-        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
-        @SuppressWarnings("unchecked")
-        FlinkInMemoryKeyedCombiningValue<?, ?, ?> combiningValue =
-            (FlinkInMemoryKeyedCombiningValue<?, ?, ?>) inMemoryState.get(namespace, stateTag, null);
-        combiningValue.restoreState(reader);
-        break;
-      }
-      default:
-        throw new RuntimeException("Unknown State Type " + stateItemType + ".");
-    }
-  }
-
-  private ByteString encodeKey(StateNamespace namespace, StateTag<? super K, ?> address) {
-    StringBuilder sb = new StringBuilder();
-    try {
-      namespace.appendTo(sb);
-      sb.append('+');
-      address.appendTo(sb);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return ByteString.copyFromUtf8(sb.toString());
-  }
-
-  private int getNoOfElements() {
-    int noOfElements = 0;
-    for (State state : inMemoryState.values()) {
-      if (!(state instanceof CheckpointableIF)) {
-        throw new RuntimeException("State Implementations used by the " +
-            "Flink Dataflow Runner should implement the CheckpointableIF interface.");
-      }
-
-      if (((CheckpointableIF) state).shouldPersist()) {
-        noOfElements++;
-      }
-    }
-    return noOfElements;
-  }
-
-  private final class FlinkInMemoryValue<T> implements ValueState<T>, CheckpointableIF {
-
-    private final ByteString stateKey;
-    private final Coder<T> elemCoder;
-
-    private T value = null;
-
-    public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) {
-      this.stateKey = stateKey;
-      this.elemCoder = elemCoder;
-    }
-
-    @Override
-    public void clear() {
-      value = null;
-    }
-
-    @Override
-    public void write(T input) {
-      this.value = input;
-    }
-
-    @Override
-    public T read() {
-      return value;
-    }
-
-    @Override
-    public ValueState<T> readLater() {
-      // Ignore
-      return this;
-    }
-
-    @Override
-    public boolean shouldPersist() {
-      return value != null;
-    }
-
-    @Override
-    public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
-      if (value != null) {
-        // serialize the coder.
-        byte[] coder = InstantiationUtil.serializeObject(elemCoder);
-
-        // encode the value into a ByteString
-        ByteString.Output stream = ByteString.newOutput();
-        elemCoder.encode(value, stream, Coder.Context.OUTER);
-        ByteString data = stream.toByteString();
-
-        checkpointBuilder.addValueBuilder()
-          .setTag(stateKey)
-          .setData(coder)
-          .setData(data);
-      }
-    }
-
-    public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
-      ByteString valueContent = checkpointReader.getData();
-      T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
-      write(outValue);
-    }
-  }
-
-  private final class FlinkWatermarkHoldStateImpl<W extends BoundedWindow>
-      implements WatermarkHoldState<W>, CheckpointableIF {
-
-    private final ByteString stateKey;
-
-    private Instant minimumHold = null;
-
-    private OutputTimeFn<? super W> outputTimeFn;
-
-    public FlinkWatermarkHoldStateImpl(ByteString stateKey, OutputTimeFn<? super W> outputTimeFn) {
-      this.stateKey = stateKey;
-      this.outputTimeFn = outputTimeFn;
-    }
-
-    @Override
-    public void clear() {
-      // Even though we're clearing we can't remove this from the in-memory state map, since
-      // other users may already have a handle on this WatermarkBagInternal.
-      minimumHold = null;
-      watermarkHoldAccessor = null;
-    }
-
-    @Override
-    public void add(Instant watermarkHold) {
-      if (minimumHold == null || minimumHold.isAfter(watermarkHold)) {
-        watermarkHoldAccessor = watermarkHold;
-        minimumHold = watermarkHold;
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          return minimumHold == null;
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          // Ignore
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
-    }
-
-    @Override
-    public Instant read() {
-      return minimumHold;
-    }
-
-    @Override
-    public WatermarkHoldState<W> readLater() {
-      // Ignore
-      return this;
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toString(minimumHold);
-    }
-
-    @Override
-    public boolean shouldPersist() {
-      return minimumHold != null;
-    }
-
-    @Override
-    public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
-      if (minimumHold != null) {
-        checkpointBuilder.addWatermarkHoldsBuilder()
-            .setTag(stateKey)
-            .setTimestamp(minimumHold);
-      }
-    }
-
-    public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
-      Instant watermark = checkpointReader.getTimestamp();
-      add(watermark);
-    }
-  }
-
-
-  private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withContext(
-      final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
-      @Override
-      public AccumT createAccumulator(K key, CombineWithContext.Context c) {
-        return combineFn.createAccumulator(key);
-      }
-
-      @Override
-      public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
-        return combineFn.addInput(key, accumulator, value);
-      }
-
-      @Override
-      public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
-        return combineFn.mergeAccumulators(key, accumulators);
-      }
-
-      @Override
-      public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
-        return combineFn.extractOutput(key, accumulator);
-      }
-    };
-  }
-
-  private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withKeyAndContext(
-      final Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
-      @Override
-      public AccumT createAccumulator(K key, CombineWithContext.Context c) {
-        return combineFn.createAccumulator();
-      }
-
-      @Override
-      public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
-        return combineFn.addInput(accumulator, value);
-      }
-
-      @Override
-      public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
-        return combineFn.mergeAccumulators(accumulators);
-      }
-
-      @Override
-      public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
-        return combineFn.extractOutput(accumulator);
-      }
-    };
-  }
-
-  private final class FlinkInMemoryKeyedCombiningValue<InputT, AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT>, CheckpointableIF {
-
-    private final ByteString stateKey;
-    private final CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn;
-    private final Coder<AccumT> accumCoder;
-    private final CombineWithContext.Context context;
-
-    private AccumT accum = null;
-    private boolean isClear = true;
-
-    private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
-                                             Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
-                                             Coder<AccumT> accumCoder,
-                                             final StateContext<?> stateContext) {
-      this(stateKey, withKeyAndContext(combineFn), accumCoder, stateContext);
-    }
-
-
-    private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
-                                             Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
-                                             Coder<AccumT> accumCoder,
-                                             final StateContext<?> stateContext) {
-      this(stateKey, withContext(combineFn), accumCoder, stateContext);
-    }
-
-    private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
-                                             CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn,
-                                             Coder<AccumT> accumCoder,
-                                             final StateContext<?> stateContext) {
-      checkNotNull(combineFn);
-      checkNotNull(accumCoder);
-
-      this.stateKey = stateKey;
-      this.combineFn = combineFn;
-      this.accumCoder = accumCoder;
-      this.context = new CombineWithContext.Context() {
-        @Override
-        public PipelineOptions getPipelineOptions() {
-          return stateContext.getPipelineOptions();
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view) {
-          return stateContext.sideInput(view);
-        }
-      };
-      accum = combineFn.createAccumulator(key, context);
-    }
-
-    @Override
-    public void clear() {
-      accum = combineFn.createAccumulator(key, context);
-      isClear = true;
-    }
-
-    @Override
-    public void add(InputT input) {
-      isClear = false;
-      accum = combineFn.addInput(key, accum, input, context);
-    }
-
-    @Override
-    public AccumT getAccum() {
-      return accum;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          // Ignore
-          return this;
-        }
-
-        @Override
-        public Boolean read() {
-          return isClear;
-        }
-      };
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      isClear = false;
-      this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum), context);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(key, accumulators, context);
-    }
-
-    @Override
-    public OutputT read() {
-      return combineFn.extractOutput(key, accum, context);
-    }
-
-    @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
-      // Ignore
-      return this;
-    }
-
-    @Override
-    public boolean shouldPersist() {
-      return !isClear;
-    }
-
-    @Override
-    public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
-      if (!isClear) {
-        // serialize the coder.
-        byte[] coder = InstantiationUtil.serializeObject(accumCoder);
-
-        // serialize the combiner.
-        byte[] combiner = InstantiationUtil.serializeObject(combineFn);
-
-        // encode the accumulator into a ByteString
-        ByteString.Output stream = ByteString.newOutput();
-        accumCoder.encode(accum, stream, Coder.Context.OUTER);
-        ByteString data = stream.toByteString();
-
-        // put the flag that the next serialized element is an accumulator
-        checkpointBuilder.addAccumulatorBuilder()
-          .setTag(stateKey)
-          .setData(coder)
-          .setData(combiner)
-          .setData(data);
-      }
-    }
-
-    public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
-      ByteString valueContent = checkpointReader.getData();
-      AccumT accum = this.accumCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
-      addAccum(accum);
-    }
-  }
-
-  private static final class FlinkInMemoryBag<T> implements BagState<T>, CheckpointableIF {
-    private final List<T> contents = new ArrayList<>();
-
-    private final ByteString stateKey;
-    private final Coder<T> elemCoder;
-
-    public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) {
-      this.stateKey = stateKey;
-      this.elemCoder = elemCoder;
-    }
-
-    @Override
-    public void clear() {
-      contents.clear();
-    }
-
-    @Override
-    public Iterable<T> read() {
-      return contents;
-    }
-
-    @Override
-    public BagState<T> readLater() {
-      // Ignore
-      return this;
-    }
-
-    @Override
-    public void add(T input) {
-      contents.add(input);
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          // Ignore
-          return this;
-        }
-
-        @Override
-        public Boolean read() {
-          return contents.isEmpty();
-        }
-      };
-    }
-
-    @Override
-    public boolean shouldPersist() {
-      return !contents.isEmpty();
-    }
-
-    @Override
-    public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
-      if (!contents.isEmpty()) {
-        // serialize the coder.
-        byte[] coder = InstantiationUtil.serializeObject(elemCoder);
-
-        checkpointBuilder.addListUpdatesBuilder()
-            .setTag(stateKey)
-            .setData(coder)
-            .writeInt(contents.size());
-
-        for (T item : contents) {
-          // encode the element
-          ByteString.Output stream = ByteString.newOutput();
-          elemCoder.encode(item, stream, Coder.Context.OUTER);
-          ByteString data = stream.toByteString();
-
-          // add the data to the checkpoint.
-          checkpointBuilder.setData(data);
-        }
-      }
-    }
-
-    public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
-      int noOfValues = checkpointReader.getInt();
-      for (int j = 0; j < noOfValues; j++) {
-        ByteString valueContent = checkpointReader.getData();
-        T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
-        add(outValue);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java
deleted file mode 100644
index 5a843ab..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-public class StateCheckpointReader {
-
-  private final DataInputView input;
-
-  public StateCheckpointReader(DataInputView in) {
-    this.input = in;
-  }
-
-  public ByteString getTag() throws IOException {
-    return ByteString.copyFrom(readRawData());
-  }
-
-  public String getTagToString() throws IOException {
-    return input.readUTF();
-  }
-
-  public ByteString getData() throws IOException {
-    return ByteString.copyFrom(readRawData());
-  }
-
-  public int getInt() throws IOException {
-    validate();
-    return input.readInt();
-  }
-
-  public byte getByte() throws IOException {
-    validate();
-    return input.readByte();
-  }
-
-  public Instant getTimestamp() throws IOException {
-    validate();
-    Long watermarkMillis = input.readLong();
-    return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis));
-  }
-
-  public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws IOException {
-    return deserializeObject(keySerializer);
-  }
-
-  public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) throws IOException {
-    return objectSerializer.deserialize(input);
-  }
-
-  /////////      Helper Methods      ///////
-
-  private byte[] readRawData() throws IOException {
-    validate();
-    int size = input.readInt();
-
-    byte[] serData = new byte[size];
-    int bytesRead = input.read(serData);
-    if (bytesRead != size) {
-      throw new RuntimeException("Error while deserializing checkpoint. Not enough bytes in the input stream.");
-    }
-    return serData;
-  }
-
-  private void validate() {
-    if (this.input == null) {
-      throw new RuntimeException("StateBackend not initialized yet.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java
deleted file mode 100644
index 4fbd6f0..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public class StateCheckpointUtils {
-
-  public static <K> void encodeState(Map<K, FlinkStateInternals<K>> perKeyStateInternals,
-               StateCheckpointWriter writer, Coder<K> keyCoder) throws IOException {
-    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-
-    int noOfKeys = perKeyStateInternals.size();
-    writer.writeInt(noOfKeys);
-    for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : perKeyStateInternals.entrySet()) {
-      K key = keyStatePair.getKey();
-      FlinkStateInternals<K> state = keyStatePair.getValue();
-
-      // encode the key
-      writer.serializeKey(key, keySerializer);
-
-      // write the associated state
-      state.persistState(writer);
-    }
-  }
-
-  public static <K> Map<K, FlinkStateInternals<K>> decodeState(
-      StateCheckpointReader reader,
-      OutputTimeFn<? super BoundedWindow> outputTimeFn,
-      Coder<K> keyCoder,
-      Coder<? extends BoundedWindow> windowCoder,
-      ClassLoader classLoader) throws IOException, ClassNotFoundException {
-
-    int noOfKeys = reader.getInt();
-    Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(noOfKeys);
-    perKeyStateInternals.clear();
-
-    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-    for (int i = 0; i < noOfKeys; i++) {
-
-      // decode the key.
-      K key = reader.deserializeKey(keySerializer);
-
-      //decode the state associated to the key.
-      FlinkStateInternals<K> stateForKey =
-          new FlinkStateInternals<>(key, keyCoder, windowCoder, outputTimeFn);
-      stateForKey.restoreState(reader, classLoader);
-      perKeyStateInternals.put(key, stateForKey);
-    }
-    return perKeyStateInternals;
-  }
-
-  //////////////        Encoding/Decoding the Timers        ////////////////
-
-
-  public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> allTimers,
-                StateCheckpointWriter writer,
-                Coder<K> keyCoder) throws IOException {
-    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-
-    int noOfKeys = allTimers.size();
-    writer.writeInt(noOfKeys);
-    for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : allTimers.entrySet()) {
-      K key = timersPerKey.getKey();
-
-      // encode the key
-      writer.serializeKey(key, keySerializer);
-
-      // write the associated timers
-      Set<TimerInternals.TimerData> timers = timersPerKey.getValue();
-      encodeTimerDataForKey(writer, timers);
-    }
-  }
-
-  public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers(
-      StateCheckpointReader reader,
-      Coder<? extends BoundedWindow> windowCoder,
-      Coder<K> keyCoder) throws IOException {
-
-    int noOfKeys = reader.getInt();
-    Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(noOfKeys);
-    activeTimers.clear();
-
-    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-    for (int i = 0; i < noOfKeys; i++) {
-
-      // decode the key.
-      K key = reader.deserializeKey(keySerializer);
-
-      // decode the associated timers.
-      Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, windowCoder);
-      activeTimers.put(key, timers);
-    }
-    return activeTimers;
-  }
-
-  private static void encodeTimerDataForKey(StateCheckpointWriter writer, Set<TimerInternals.TimerData> timers) throws IOException {
-    // encode timers
-    writer.writeInt(timers.size());
-    for (TimerInternals.TimerData timer : timers) {
-      String stringKey = timer.getNamespace().stringKey();
-
-      writer.setTag(stringKey);
-      writer.setTimestamp(timer.getTimestamp());
-      writer.writeInt(timer.getDomain().ordinal());
-    }
-  }
-
-  private static Set<TimerInternals.TimerData> decodeTimerDataForKey(
-      StateCheckpointReader reader, Coder<? extends BoundedWindow> windowCoder) throws IOException {
-
-    // decode the timers: first their number and then the content itself.
-    int noOfTimers = reader.getInt();
-    Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers);
-    for (int i = 0; i < noOfTimers; i++) {
-      String stringKey = reader.getTagToString();
-      Instant instant = reader.getTimestamp();
-      TimeDomain domain = TimeDomain.values()[reader.getInt()];
-
-      StateNamespace namespace = StateNamespaces.fromString(stringKey, windowCoder);
-      timers.add(TimerInternals.TimerData.of(namespace, instant, domain));
-    }
-    return timers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java
deleted file mode 100644
index d09157c..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-public class StateCheckpointWriter {
-
-  private final AbstractStateBackend.CheckpointStateOutputView output;
-
-  public static StateCheckpointWriter create(AbstractStateBackend.CheckpointStateOutputView output) {
-    return new StateCheckpointWriter(output);
-  }
-
-  private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) {
-    this.output = output;
-  }
-
-  /////////      Creating the serialized versions of the different types of state held by dataflow      ///////
-
-  public StateCheckpointWriter addValueBuilder() throws IOException {
-    validate();
-    StateType.serialize(StateType.VALUE, this);
-    return this;
-  }
-
-  public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException {
-    validate();
-    StateType.serialize(StateType.WATERMARK, this);
-    return this;
-  }
-
-  public StateCheckpointWriter addListUpdatesBuilder() throws IOException {
-    validate();
-    StateType.serialize(StateType.LIST, this);
-    return this;
-  }
-
-  public StateCheckpointWriter addAccumulatorBuilder() throws IOException {
-    validate();
-    StateType.serialize(StateType.ACCUMULATOR, this);
-    return this;
-  }
-
-  /////////      Setting the tag for a given state element      ///////
-
-  public StateCheckpointWriter setTag(ByteString stateKey) throws IOException {
-    return writeData(stateKey.toByteArray());
-  }
-
-  public StateCheckpointWriter setTag(String stateKey) throws IOException {
-    output.writeUTF(stateKey);
-    return this;
-  }
-
-
-  public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException {
-    return serializeObject(key, keySerializer);
-  }
-
-  public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException {
-    objectSerializer.serialize(object, output);
-    return this;
-  }
-
-  /////////      Write the actual serialized data      //////////
-
-  public StateCheckpointWriter setData(ByteString data) throws IOException {
-    return writeData(data.toByteArray());
-  }
-
-  public StateCheckpointWriter setData(byte[] data) throws IOException {
-    return writeData(data);
-  }
-
-  public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException {
-    validate();
-    output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
-    return this;
-  }
-
-  public StateCheckpointWriter writeInt(int number) throws IOException {
-    validate();
-    output.writeInt(number);
-    return this;
-  }
-
-  public StateCheckpointWriter writeByte(byte b) throws IOException {
-    validate();
-    output.writeByte(b);
-    return this;
-  }
-
-  /////////      Helper Methods      ///////
-
-  private StateCheckpointWriter writeData(byte[] data) throws IOException {
-    validate();
-    output.writeInt(data.length);
-    output.write(data);
-    return this;
-  }
-
-  private void validate() {
-    if (this.output == null) {
-      throw new RuntimeException("StateBackend not initialized yet.");
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java
deleted file mode 100644
index 5849773..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import java.io.IOException;
-
-/**
- * The available types of state, as provided by the Beam SDK. This class is used for serialization/deserialization
- * purposes.
- * */
-public enum StateType {
-
-  VALUE(0),
-
-  WATERMARK(1),
-
-  LIST(2),
-
-  ACCUMULATOR(3);
-
-  private final int numVal;
-
-  StateType(int value) {
-    this.numVal = value;
-  }
-
-  public static void serialize(StateType type, StateCheckpointWriter output) throws IOException {
-    if (output == null) {
-      throw new IllegalArgumentException("Cannot write to a null output.");
-    }
-
-    if(type.numVal < 0 || type.numVal > 3) {
-      throw new RuntimeException("Unknown State Type " + type + ".");
-    }
-
-    output.writeByte((byte) type.numVal);
-  }
-
-  public static StateType deserialize(StateCheckpointReader input) throws IOException {
-    if (input == null) {
-      throw new IllegalArgumentException("Cannot read from a null input.");
-    }
-
-    int typeInt = (int) input.getByte();
-    if(typeInt < 0 || typeInt > 3) {
-      throw new RuntimeException("Unknown State Type " + typeInt + ".");
-    }
-
-    StateType resultType = null;
-    for(StateType st: values()) {
-      if(st.numVal == typeInt) {
-        resultType = st;
-        break;
-      }
-    }
-    return resultType;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index c24d91d..77e8a47 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkAbstractParDoWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -31,20 +31,26 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashMap;
 
 /**
- * Tests the serialization and deserialization of PipelineOptions.
+ * Tests for serialization and deserialization of {@link PipelineOptions} in {@link DoFnOperator}.
  */
 public class PipelineOptionsTest {
 
@@ -58,7 +64,7 @@ public class PipelineOptionsTest {
   private static MyOptions options;
   private static SerializedPipelineOptions serializedOptions;
 
-  private final static String[] args = new String[]{"--testOption=nothing"};
+  private static final String[] args = new String[]{"--testOption=nothing"};
 
   @BeforeClass
   public static void beforeTest() {
@@ -74,7 +80,9 @@ public class PipelineOptionsTest {
 
   @Test
   public void testCaching() {
-    PipelineOptions deserializedOptions = serializedOptions.getPipelineOptions().as(PipelineOptions.class);
+    PipelineOptions deserializedOptions =
+        serializedOptions.getPipelineOptions().as(PipelineOptions.class);
+
     assertNotNull(deserializedOptions);
     assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
     assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
@@ -87,29 +95,57 @@ public class PipelineOptionsTest {
   }
 
   @Test(expected = Exception.class)
-  public void ParDoBaseClassPipelineOptionsNullTest() {
-    new TestParDoWrapper(null, WindowingStrategy.globalDefault(), new TestDoFn());
+  public void parDoBaseClassPipelineOptionsNullTest() {
+    DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
+        new TestDoFn(),
+        TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
+        new TupleTag<>("main-output"),
+        Collections.<TupleTag<?>>emptyList(),
+        new DoFnOperator.DefaultOutputManagerFactory<>(),
+        WindowingStrategy.globalDefault(),
+        new HashMap<Integer, PCollectionView<?>>(),
+        Collections.<PCollectionView<?>>emptyList(),
+        null);
+
   }
 
   /**
-   * Tests that PipelineOptions are present after serialization
+   * Tests that PipelineOptions are present after serialization.
    */
   @Test
-  public void ParDoBaseClassPipelineOptionsSerializationTest() throws Exception {
-    TestParDoWrapper wrapper =
-        new TestParDoWrapper(options, WindowingStrategy.globalDefault(), new TestDoFn());
+  public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
+
+    DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
+        new TestDoFn(),
+        TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
+        new TupleTag<>("main-output"),
+        Collections.<TupleTag<?>>emptyList(),
+        new DoFnOperator.DefaultOutputManagerFactory<>(),
+        WindowingStrategy.globalDefault(),
+        new HashMap<Integer, PCollectionView<?>>(),
+        Collections.<PCollectionView<?>>emptyList(),
+        options);
+
+    final byte[] serialized = SerializationUtils.serialize(doFnOperator);
+
+    @SuppressWarnings("unchecked")
+    DoFnOperator<Object, Object, Object> deserialized =
+        (DoFnOperator<Object, Object, Object>) SerializationUtils.deserialize(serialized);
 
-    final byte[] serialized = SerializationUtils.serialize(wrapper);
-    TestParDoWrapper deserialize = (TestParDoWrapper) SerializationUtils.deserialize(serialized);
+    OneInputStreamOperatorTestHarness<WindowedValue<Object>, Object> testHarness =
+        new OneInputStreamOperatorTestHarness<>(deserialized, new ExecutionConfig());
+
+    testHarness.open();
 
     // execute once to access options
-    deserialize.flatMap(
+    testHarness.processElement(new StreamRecord<>(
         WindowedValue.of(
             new Object(),
             Instant.now(),
             GlobalWindow.INSTANCE,
-            PaneInfo.NO_FIRING),
-        Mockito.mock(Collector.class));
+            PaneInfo.NO_FIRING)));
+
+    testHarness.close();
 
   }
 
@@ -124,35 +160,4 @@ public class PipelineOptionsTest {
           c.getPipelineOptions().as(MyOptions.class).getTestOption());
     }
   }
-
-  private static class TestParDoWrapper extends FlinkAbstractParDoWrapper {
-    public TestParDoWrapper(PipelineOptions options, WindowingStrategy windowingStrategy, OldDoFn doFn) {
-      super(options, windowingStrategy, doFn);
-    }
-
-
-    @Override
-    public WindowingInternals windowingInternalsHelper(
-        WindowedValue inElement,
-        Collector outCollector) {
-      return null;
-    }
-
-    @Override
-    public void sideOutputWithTimestampHelper(
-        WindowedValue inElement,
-        Object output,
-        Instant timestamp,
-        Collector outCollector,
-        TupleTag tag) {}
-
-    @Override
-    public void outputWithTimestampHelper(
-        WindowedValue inElement,
-        Object output,
-        Instant timestamp,
-        Collector outCollector) {}
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
new file mode 100644
index 0000000..627f545
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkStateInternals;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaceForTest;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Tests for {@link FlinkStateInternals}. This is based on the tests for {@code InMemoryStateInternals}.
+ */
+@RunWith(JUnit4.class)
+public class FlinkStateInternalsTest {
+  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
+  private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
+  private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
+  private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
+
+  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+      StateTags.value("stringValue", StringUtf8Coder.of());
+  private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+          "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+      StateTags.bag("stringBag", StringUtf8Coder.of());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+
+  FlinkStateInternals<String> underTest;
+
+  @Before
+  public void initStateInternals() {
+    MemoryStateBackend backend = new MemoryStateBackend();
+    try {
+      backend.initializeForJob(
+          new DummyEnvironment("test", 1, 0),
+          "test_op",
+          new GenericTypeInfo<>(ByteBuffer.class).createSerializer(new ExecutionConfig()));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    underTest = new FlinkStateInternals<>(backend, StringUtf8Coder.of());
+    try {
+      backend.setCurrentKey(
+          ByteBuffer.wrap(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Hello")));
+    } catch (CoderException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testValue() throws Exception {
+    ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
+
+    assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+    assertNotEquals(
+        underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
+        value);
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.write("hello");
+    assertThat(value.read(), Matchers.equalTo("hello"));
+    value.write("world");
+    assertThat(value.read(), Matchers.equalTo("world"));
+
+    value.clear();
+    assertThat(value.read(), Matchers.nullValue());
+    assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+
+  }
+
+  @Test
+  public void testBag() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+
+    assertThat(value.read(), Matchers.emptyIterable());
+    value.add("hello");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+
+    value.add("world");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+
+    value.clear();
+    assertThat(value.read(), Matchers.emptyIterable());
+    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
+
+  }
+
+  @Test
+  public void testBagIsEmpty() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add("hello");
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeBagIntoSource() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testMergeBagIntoNewNamespace() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag1.read(), Matchers.emptyIterable());
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testCombiningValue() throws Exception {
+    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
+
+    assertThat(value.read(), Matchers.equalTo(0));
+    value.add(2);
+    assertThat(value.read(), Matchers.equalTo(2));
+
+    value.add(3);
+    assertThat(value.read(), Matchers.equalTo(5));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(0));
+    assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
+  }
+
+  @Test
+  public void testCombiningIsEmpty() throws Exception {
+    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(5);
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoSource() throws Exception {
+    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    assertThat(value1.read(), Matchers.equalTo(11));
+    assertThat(value2.read(), Matchers.equalTo(10));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
+
+    assertThat(value1.read(), Matchers.equalTo(21));
+    assertThat(value2.read(), Matchers.equalTo(0));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
+    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value1.read(), Matchers.equalTo(0));
+    assertThat(value2.read(), Matchers.equalTo(0));
+    assertThat(value3.read(), Matchers.equalTo(21));
+  }
+
+  @Test
+  public void testWatermarkEarliestState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkLatestState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkEndOfWindowState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkStateIsEmpty() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(new Instant(1000));
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeEarliestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState<BoundedWindow> value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the merged value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
+
+    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+
+  @Test
+  public void testMergeLatestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState<BoundedWindow> value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value3 =
+        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
+    assertThat(value1.read(), Matchers.equalTo(null));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+}