You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:53:04 UTC

[20/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples package

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
deleted file mode 100644
index af4b354..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
deleted file mode 100644
index 9b449aa..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for
- * Dataflow {@link org.apache.beam.sdk.coders.Coder}s.
- */
-public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T> {
-
-  private final Coder<T> coder;
-
-  public CoderTypeInformation(Coder<T> coder) {
-    checkNotNull(coder);
-    this.coder = coder;
-  }
-
-  public Coder<T> getCoder() {
-    return coder;
-  }
-
-  @Override
-  public boolean isBasicType() {
-    return false;
-  }
-
-  @Override
-  public boolean isTupleType() {
-    return false;
-  }
-
-  @Override
-  public int getArity() {
-    return 1;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public Class<T> getTypeClass() {
-    // We don't have the Class, so we have to pass null here. What a shame...
-    return (Class<T>) Object.class;
-  }
-
-  @Override
-  public boolean isKeyType() {
-    return true;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-    return new CoderTypeSerializer<>(coder);
-  }
-
-  @Override
-  public int getTotalFields() {
-    return 2;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    CoderTypeInformation that = (CoderTypeInformation) o;
-
-    return coder.equals(that.coder);
-
-  }
-
-  @Override
-  public int hashCode() {
-    return coder.hashCode();
-  }
-
-  @Override
-  public boolean canEqual(Object obj) {
-    return obj instanceof CoderTypeInformation;
-  }
-
-  @Override
-  public String toString() {
-    return "CoderTypeInformation{coder=" + coder + '}';
-  }
-
-  @Override
-  public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig
-      executionConfig) {
-    throw new UnsupportedOperationException(
-        "Non-encoded values cannot be compared directly.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
deleted file mode 100644
index e210ed9..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.EOFException;
-import java.io.IOException;
-import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
- * Dataflow {@link org.apache.beam.sdk.coders.Coder Coders}.
- */
-public class CoderTypeSerializer<T> extends TypeSerializer<T> {
-
-  private Coder<T> coder;
-
-  public CoderTypeSerializer(Coder<T> coder) {
-    this.coder = coder;
-  }
-
-  @Override
-  public boolean isImmutableType() {
-    return false;
-  }
-
-  @Override
-  public CoderTypeSerializer<T> duplicate() {
-    return new CoderTypeSerializer<>(coder);
-  }
-
-  @Override
-  public T createInstance() {
-    return null;
-  }
-
-  @Override
-  public T copy(T t) {
-    try {
-      return CoderUtils.clone(coder, t);
-    } catch (CoderException e) {
-      throw new RuntimeException("Could not clone.", e);
-    }
-  }
-
-  @Override
-  public T copy(T t, T reuse) {
-    return copy(t);
-  }
-
-  @Override
-  public int getLength() {
-    return -1;
-  }
-
-  @Override
-  public void serialize(T t, DataOutputView dataOutputView) throws IOException {
-    DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
-    coder.encode(t, outputWrapper, Coder.Context.NESTED);
-  }
-
-  @Override
-  public T deserialize(DataInputView dataInputView) throws IOException {
-    try {
-      DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
-      return coder.decode(inputWrapper, Coder.Context.NESTED);
-    } catch (CoderException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof EOFException) {
-        throw (EOFException) cause;
-      } else {
-        throw e;
-      }
-    }
-  }
-
-  @Override
-  public T deserialize(T t, DataInputView dataInputView) throws IOException {
-    return deserialize(dataInputView);
-  }
-
-  @Override
-  public void copy(
-      DataInputView dataInputView,
-      DataOutputView dataOutputView) throws IOException {
-    serialize(deserialize(dataInputView), dataOutputView);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    CoderTypeSerializer that = (CoderTypeSerializer) o;
-    return coder.equals(that.coder);
-  }
-
-  @Override
-  public boolean canEqual(Object obj) {
-    return obj instanceof CoderTypeSerializer;
-  }
-
-  @Override
-  public int hashCode() {
-    return coder.hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
deleted file mode 100644
index 667ef45..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for Beam values that have
- * been encoded to byte data by a {@link Coder}.
- */
-public class EncodedValueComparator extends TypeComparator<byte[]> {
-
-  /** For storing the Reference in encoded form. */
-  private transient byte[] encodedReferenceKey;
-
-  private final boolean ascending;
-
-  public EncodedValueComparator(boolean ascending) {
-    this.ascending = ascending;
-  }
-
-  @Override
-  public int hash(byte[] record) {
-    return Arrays.hashCode(record);
-  }
-
-  @Override
-  public void setReference(byte[] toCompare) {
-    this.encodedReferenceKey = toCompare;
-  }
-
-  @Override
-  public boolean equalToReference(byte[] candidate) {
-    if (encodedReferenceKey.length != candidate.length) {
-      return false;
-    }
-    int len = candidate.length;
-    for (int i = 0; i < len; i++) {
-      if (encodedReferenceKey[i] != candidate[i]) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public int compareToReference(TypeComparator<byte[]> other) {
-    // VERY IMPORTANT: compareToReference does not behave like Comparable.compare
-    // the meaning of the return value is inverted.
-
-    EncodedValueComparator otherEncodedValueComparator = (EncodedValueComparator) other;
-
-    int len = Math.min(
-        encodedReferenceKey.length,
-        otherEncodedValueComparator.encodedReferenceKey.length);
-
-    for (int i = 0; i < len; i++) {
-      byte b1 = encodedReferenceKey[i];
-      byte b2 = otherEncodedValueComparator.encodedReferenceKey[i];
-      int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
-      if (result != 0) {
-        return ascending ? -result : result;
-      }
-    }
-    int result =
-        encodedReferenceKey.length - otherEncodedValueComparator.encodedReferenceKey.length;
-    return ascending ? -result : result;
-  }
-
-
-  @Override
-  public int compare(byte[] first, byte[] second) {
-    int len = Math.min(first.length, second.length);
-    for (int i = 0; i < len; i++) {
-      byte b1 = first[i];
-      byte b2 = second[i];
-      int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
-      if (result != 0) {
-        return ascending ? result : -result;
-      }
-    }
-    int result = first.length - second.length;
-    return ascending ? result : -result;
-  }
-
-  @Override
-  public int compareSerialized(
-      DataInputView firstSource,
-      DataInputView secondSource) throws IOException {
-    int lengthFirst = firstSource.readInt();
-    int lengthSecond = secondSource.readInt();
-
-    int len = Math.min(lengthFirst, lengthSecond);
-    for (int i = 0; i < len; i++) {
-      byte b1 = firstSource.readByte();
-      byte b2 = secondSource.readByte();
-      int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
-      if (result != 0) {
-        return ascending ? result : -result;
-      }
-    }
-
-    int result = lengthFirst - lengthSecond;
-    return ascending ? result : -result;
-  }
-
-
-
-  @Override
-  public boolean supportsNormalizedKey() {
-    // disabled because this seems to not work with some coders,
-    // such as the AvroCoder
-    return false;
-  }
-
-  @Override
-  public boolean supportsSerializationWithKeyNormalization() {
-    return false;
-  }
-
-  @Override
-  public int getNormalizeKeyLen() {
-    return Integer.MAX_VALUE;
-  }
-
-  @Override
-  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-    return true;
-  }
-
-  @Override
-  public void putNormalizedKey(byte[] record, MemorySegment target, int offset, int numBytes) {
-    final int limit = offset + numBytes;
-
-    target.put(offset, record, 0, Math.min(numBytes, record.length));
-
-    offset += record.length;
-
-    while (offset < limit) {
-      target.put(offset++, (byte) 0);
-    }
-  }
-
-  @Override
-  public void writeWithKeyNormalization(byte[] record, DataOutputView target) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public byte[] readWithKeyDenormalization(byte[] reuse, DataInputView source) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean invertNormalizedKey() {
-    return !ascending;
-  }
-
-  @Override
-  public TypeComparator<byte[]> duplicate() {
-    return new EncodedValueComparator(ascending);
-  }
-
-  @Override
-  public int extractKeys(Object record, Object[] target, int index) {
-    target[index] = record;
-    return 1;
-  }
-
-  @Override
-  public TypeComparator[] getFlatComparators() {
-    return new TypeComparator[] { this.duplicate() };
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
deleted file mode 100644
index 41db61e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.IOException;
-
-import org.apache.beam.sdk.coders.Coder;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * {@link TypeSerializer} for values that were encoded using a {@link Coder}.
- */
-public final class EncodedValueSerializer extends TypeSerializer<byte[]> {
-
-  private static final long serialVersionUID = 1L;
-
-  private static final byte[] EMPTY = new byte[0];
-
-  @Override
-  public boolean isImmutableType() {
-    return true;
-  }
-
-  @Override
-  public byte[] createInstance() {
-    return EMPTY;
-  }
-
-  @Override
-  public byte[] copy(byte[] from) {
-    return from;
-  }
-
-  @Override
-  public byte[] copy(byte[] from, byte[] reuse) {
-    return copy(from);
-  }
-
-  @Override
-  public int getLength() {
-    return -1;
-  }
-
-
-  @Override
-  public void serialize(byte[] record, DataOutputView target) throws IOException {
-    if (record == null) {
-      throw new IllegalArgumentException("The record must not be null.");
-    }
-
-    final int len = record.length;
-    target.writeInt(len);
-    target.write(record);
-  }
-
-  @Override
-  public byte[] deserialize(DataInputView source) throws IOException {
-    final int len = source.readInt();
-    byte[] result = new byte[len];
-    source.readFully(result);
-    return result;
-  }
-
-  @Override
-  public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
-    return deserialize(source);
-  }
-
-  @Override
-  public void copy(DataInputView source, DataOutputView target) throws IOException {
-    final int len = source.readInt();
-    target.writeInt(len);
-    target.write(source, len);
-  }
-
-  @Override
-  public boolean canEqual(Object obj) {
-    return obj instanceof EncodedValueSerializer;
-  }
-
-  @Override
-  public int hashCode() {
-    return this.getClass().hashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    return obj instanceof EncodedValueSerializer;
-  }
-
-  @Override
-  public TypeSerializer<byte[]> duplicate() {
-    return this;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
deleted file mode 100644
index e24bf31..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * Flink {@link TypeInformation} for Beam values that have been encoded to byte data
- * by a {@link Coder}.
- */
-public class EncodedValueTypeInformation
-    extends TypeInformation<byte[]>
-    implements AtomicType<byte[]> {
-
-  private static final long serialVersionUID = 1L;
-
-  @Override
-  public boolean isBasicType() {
-    return false;
-  }
-
-  @Override
-  public boolean isTupleType() {
-    return false;
-  }
-
-  @Override
-  public int getArity() {
-    return 0;
-  }
-
-  @Override
-  public int getTotalFields() {
-    return 0;
-  }
-
-  @Override
-  public Class<byte[]> getTypeClass() {
-    return byte[].class;
-  }
-
-  @Override
-  public boolean isKeyType() {
-    return true;
-  }
-
-  @Override
-  public TypeSerializer<byte[]> createSerializer(ExecutionConfig executionConfig) {
-    return new EncodedValueSerializer();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    return other instanceof EncodedValueTypeInformation;
-  }
-
-  @Override
-  public int hashCode() {
-    return this.getClass().hashCode();
-  }
-
-  @Override
-  public boolean canEqual(Object obj) {
-    return obj instanceof EncodedValueTypeInformation;
-  }
-
-  @Override
-  public String toString() {
-    return "EncodedValueTypeInformation";
-  }
-
-  @Override
-  public TypeComparator<byte[]> createComparator(
-      boolean sortOrderAscending,
-      ExecutionConfig executionConfig) {
-    return new EncodedValueComparator(sortOrderAscending);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
deleted file mode 100644
index 36b5ba3..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.ByteArrayOutputStream;
-
-/**
- * Version of {@link java.io.ByteArrayOutputStream} that allows to retrieve the internal
- * byte[] buffer without incurring an array copy.
- */
-public class InspectableByteArrayOutputStream extends ByteArrayOutputStream {
-
-  /**
-   * Get the underlying byte array.
-   */
-  public byte[] getBuffer() {
-    return buf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
deleted file mode 100644
index 9df6836..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-/**
- * {@link KeySelector} that extracts the key from a {@link KV} and returns
- * it in encoded form as a {@code byte} array.
- */
-public class KvKeySelector<InputT, K>
-    implements KeySelector<WindowedValue<KV<K, InputT>>, byte[]>, ResultTypeQueryable<byte[]> {
-
-  private final Coder<K> keyCoder;
-
-  public KvKeySelector(Coder<K> keyCoder) {
-    this.keyCoder = keyCoder;
-  }
-
-  @Override
-  public byte[] getKey(WindowedValue<KV<K, InputT>> value) throws Exception {
-    return CoderUtils.encodeToByteArray(keyCoder, value.getValue().getKey());
-  }
-
-  @Override
-  public TypeInformation<byte[]> getProducedType() {
-    return new EncodedValueTypeInformation();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
deleted file mode 100644
index 6fb3182..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.types;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
deleted file mode 100644
index 2256bb1..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink.translation.utils;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.IOChannelUtils;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
-  private final byte[] serializedOptions;
-
-  /** Lazily initialized copy of deserialized options. */
-  private transient PipelineOptions pipelineOptions;
-
-  public SerializedPipelineOptions(PipelineOptions options) {
-    checkNotNull(options, "PipelineOptions must not be null.");
-
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-      new ObjectMapper().writeValue(baos, options);
-      this.serializedOptions = baos.toByteArray();
-    } catch (Exception e) {
-      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
-    }
-
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    if (pipelineOptions == null) {
-      try {
-        pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
-
-        IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
-        FileSystems.setDefaultConfigInWorkers(pipelineOptions);
-      } catch (IOException e) {
-        throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
-      }
-    }
-
-    return pipelineOptions;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
deleted file mode 100644
index 5dedd53..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.utils;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
deleted file mode 100644
index 82a2c4e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.flink.core.memory.DataInputView;
-
-/**
- * Wrapper for {@link DataInputView}. We need this because Flink reads data using a
- * {@link org.apache.flink.core.memory.DataInputView} while
- * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an
- * {@link java.io.InputStream}.
- */
-public class DataInputViewWrapper extends InputStream {
-
-  private DataInputView inputView;
-
-  public DataInputViewWrapper(DataInputView inputView) {
-    this.inputView = inputView;
-  }
-
-  public void setInputView(DataInputView inputView) {
-    this.inputView = inputView;
-  }
-
-  @Override
-  public int read() throws IOException {
-    try {
-      return inputView.readUnsignedByte();
-    } catch (EOFException e) {
-      // translate between DataInput and InputStream,
-      // DataInput signals EOF by exception, InputStream does it by returning -1
-      return -1;
-    }
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    return inputView.read(b, off, len);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
deleted file mode 100644
index f2d9db2..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Wrapper for {@link org.apache.flink.core.memory.DataOutputView}. We need this because
- * Flink writes data using a {@link org.apache.flink.core.memory.DataInputView} while
- * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an
- * {@link java.io.OutputStream}.
- */
-public class DataOutputViewWrapper extends OutputStream {
-
-  private DataOutputView outputView;
-
-  public DataOutputViewWrapper(DataOutputView outputView) {
-    this.outputView = outputView;
-  }
-
-  public void setOutputView(DataOutputView outputView) {
-    this.outputView = outputView;
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    outputView.write(b);
-  }
-
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    outputView.write(b, off, len);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
deleted file mode 100644
index 70d97e3..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.Serializable;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.flink.api.common.accumulators.Accumulator;
-
-/**
- * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn}
- * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
- * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo}
- * operation.
- */
-public class SerializableFnAggregatorWrapper<InputT, OutputT>
-    implements Aggregator<InputT, OutputT>, Accumulator<InputT, Serializable> {
-
-  private OutputT aa;
-  private Combine.CombineFn<InputT, ?, OutputT> combiner;
-
-  public SerializableFnAggregatorWrapper(Combine.CombineFn<InputT, ?, OutputT> combiner) {
-    this.combiner = combiner;
-    resetLocal();
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void add(InputT value) {
-    this.aa = combiner.apply(ImmutableList.of((InputT) aa, value));
-  }
-
-  @Override
-  public Serializable getLocalValue() {
-    return (Serializable) aa;
-  }
-
-  @Override
-  public void resetLocal() {
-    this.aa = combiner.apply(ImmutableList.<InputT>of());
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void merge(Accumulator<InputT, Serializable> other) {
-    this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue()));
-  }
-
-  @Override
-  public void addValue(InputT value) {
-    add(value);
-  }
-
-  @Override
-  public String getName() {
-    return "Aggregator :" + combiner.toString();
-  }
-
-  @Override
-  public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
-    return combiner;
-  }
-
-  @Override
-  public Accumulator<InputT, Serializable> clone() {
-    try {
-      super.clone();
-    } catch (CloneNotSupportedException e) {
-      // Flink Accumulators cannot throw CloneNotSupportedException, work around that.
-      throw new RuntimeException(e);
-    }
-
-    // copy it by merging
-    OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa));
-    SerializableFnAggregatorWrapper<InputT, OutputT> result = new
-        SerializableFnAggregatorWrapper<>(combiner);
-
-    result.aa = resultCopy;
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
deleted file mode 100644
index a87472b..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}.
- */
-public class SourceInputFormat<T>
-    implements InputFormat<WindowedValue<T>, SourceInputSplit<T>> {
-  private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
-
-  private final BoundedSource<T> initialSource;
-
-  private transient PipelineOptions options;
-  private final SerializedPipelineOptions serializedOptions;
-
-  private transient BoundedSource.BoundedReader<T> reader;
-  private boolean inputAvailable = false;
-
-  public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
-    this.initialSource = initialSource;
-    this.serializedOptions = new SerializedPipelineOptions(options);
-  }
-
-  @Override
-  public void configure(Configuration configuration) {
-    options = serializedOptions.getPipelineOptions();
-  }
-
-  @Override
-  public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
-    reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
-    inputAvailable = reader.start();
-  }
-
-  @Override
-  public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
-    try {
-      final long estimatedSize = initialSource.getEstimatedSizeBytes(options);
-
-      return new BaseStatistics() {
-        @Override
-        public long getTotalInputSize() {
-          return estimatedSize;
-        }
-
-        @Override
-        public long getNumberOfRecords() {
-          return BaseStatistics.NUM_RECORDS_UNKNOWN;
-        }
-
-        @Override
-        public float getAverageRecordWidth() {
-          return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
-        }
-      };
-    } catch (Exception e) {
-      LOG.warn("Could not read Source statistics: {}", e);
-    }
-
-    return null;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
-    try {
-      long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
-      List<? extends Source<T>> shards =
-          initialSource.split(desiredSizeBytes, options);
-      int numShards = shards.size();
-      SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
-      for (int i = 0; i < numShards; i++) {
-        sourceInputSplits[i] = new SourceInputSplit<>(shards.get(i), i);
-      }
-      return sourceInputSplits;
-    } catch (Exception e) {
-      throw new IOException("Could not create input splits from Source.", e);
-    }
-  }
-
-  @Override
-  public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
-    return new DefaultInputSplitAssigner(sourceInputSplits);
-  }
-
-
-  @Override
-  public boolean reachedEnd() throws IOException {
-    return !inputAvailable;
-  }
-
-  @Override
-  public WindowedValue<T> nextRecord(WindowedValue<T> t) throws IOException {
-    if (inputAvailable) {
-      final T current = reader.getCurrent();
-      final Instant timestamp = reader.getCurrentTimestamp();
-      // advance reader to have a record ready next time
-      inputAvailable = reader.advance();
-      return WindowedValue.of(
-          current,
-          timestamp,
-          GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-    }
-
-    return null;
-  }
-
-  @Override
-  public void close() throws IOException {
-    // TODO null check can be removed once FLINK-3796 is fixed
-    if (reader != null) {
-      reader.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
deleted file mode 100644
index e4a7386..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import org.apache.beam.sdk.io.Source;
-import org.apache.flink.core.io.InputSplit;
-
-/**
- * {@link org.apache.flink.core.io.InputSplit} for
- * {@link org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}. We pass
- * the sharded Source around in the input split because Sources simply split up into several
- * Sources for sharding. This is different to how Flink creates a separate InputSplit from
- * an InputFormat.
- */
-public class SourceInputSplit<T> implements InputSplit {
-
-  private Source<T> source;
-  private int splitNumber;
-
-  public SourceInputSplit() {
-  }
-
-  public SourceInputSplit(Source<T> source, int splitNumber) {
-    this.source = source;
-    this.splitNumber = splitNumber;
-  }
-
-  @Override
-  public int getSplitNumber() {
-    return splitNumber;
-  }
-
-  public Source<T> getSource() {
-    return source;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
deleted file mode 100644
index 72f7deb..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.wrappers;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
deleted file mode 100644
index 8a09286..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ /dev/null
@@ -1,774 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
-import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.SideInputHandler;
-import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.StatefulDoFnRunner;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
-import org.apache.flink.runtime.state.KeyGroupsList;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.joda.time.Instant;
-
-/**
- * Flink operator for executing {@link DoFn DoFns}.
- *
- * @param <InputT> the input type of the {@link DoFn}
- * @param <FnOutputT> the output type of the {@link DoFn}
- * @param <OutputT> the output type of the operator, this can be different from the fn output
- *                 type when we have additional tagged outputs
- */
-public class DoFnOperator<InputT, FnOutputT, OutputT>
-    extends AbstractStreamOperator<OutputT>
-    implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
-      TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT>,
-    KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
-
-  protected DoFn<InputT, FnOutputT> doFn;
-
-  protected final SerializedPipelineOptions serializedOptions;
-
-  protected final TupleTag<FnOutputT> mainOutputTag;
-  protected final List<TupleTag<?>> additionalOutputTags;
-
-  protected final Collection<PCollectionView<?>> sideInputs;
-  protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
-
-  protected final WindowingStrategy<?, ?> windowingStrategy;
-
-  protected final OutputManagerFactory<OutputT> outputManagerFactory;
-
-  protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
-  protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
-
-  protected transient SideInputHandler sideInputHandler;
-
-  protected transient SideInputReader sideInputReader;
-
-  protected transient DoFnRunners.OutputManager outputManager;
-
-  private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker;
-
-  protected transient long currentInputWatermark;
-
-  protected transient long currentOutputWatermark;
-
-  private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag;
-
-  protected transient FlinkStateInternals<?> stateInternals;
-
-  private Coder<WindowedValue<InputT>> inputCoder;
-
-  private final Coder<?> keyCoder;
-
-  private final TimerInternals.TimerDataCoder timerCoder;
-
-  protected transient HeapInternalTimerService<?, TimerInternals.TimerData> timerService;
-
-  protected transient FlinkTimerInternals timerInternals;
-
-  private transient StateInternals<?> pushbackStateInternals;
-
-  private transient Optional<Long> pushedBackWatermark;
-
-  public DoFnOperator(
-      DoFn<InputT, FnOutputT> doFn,
-      Coder<WindowedValue<InputT>> inputCoder,
-      TupleTag<FnOutputT> mainOutputTag,
-      List<TupleTag<?>> additionalOutputTags,
-      OutputManagerFactory<OutputT> outputManagerFactory,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Map<Integer, PCollectionView<?>> sideInputTagMapping,
-      Collection<PCollectionView<?>> sideInputs,
-      PipelineOptions options,
-      Coder<?> keyCoder) {
-    this.doFn = doFn;
-    this.inputCoder = inputCoder;
-    this.mainOutputTag = mainOutputTag;
-    this.additionalOutputTags = additionalOutputTags;
-    this.sideInputTagMapping = sideInputTagMapping;
-    this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(options);
-    this.windowingStrategy = windowingStrategy;
-    this.outputManagerFactory = outputManagerFactory;
-
-    setChainingStrategy(ChainingStrategy.ALWAYS);
-
-    this.keyCoder = keyCoder;
-
-    this.timerCoder =
-        TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
-  }
-
-  private ExecutionContext.StepContext createStepContext() {
-    return new StepContext();
-  }
-
-  // allow overriding this in WindowDoFnOperator because this one dynamically creates
-  // the DoFn
-  protected DoFn<InputT, FnOutputT> getDoFn() {
-    return doFn;
-  }
-
-  @Override
-  public void open() throws Exception {
-    super.open();
-
-    currentInputWatermark = Long.MIN_VALUE;
-    currentOutputWatermark = Long.MIN_VALUE;
-
-    AggregatorFactory aggregatorFactory = new AggregatorFactory() {
-      @Override
-      public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-          Class<?> fnClass,
-          ExecutionContext.StepContext stepContext,
-          String aggregatorName,
-          Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-
-        @SuppressWarnings("unchecked")
-        SerializableFnAggregatorWrapper<InputT, OutputT> result =
-            (SerializableFnAggregatorWrapper<InputT, OutputT>)
-                getRuntimeContext().getAccumulator(aggregatorName);
-
-        if (result == null) {
-          result = new SerializableFnAggregatorWrapper<>(combine);
-          getRuntimeContext().addAccumulator(aggregatorName, result);
-        }
-        return result;
-      }
-    };
-
-    sideInputReader = NullSideInputReader.of(sideInputs);
-
-    if (!sideInputs.isEmpty()) {
-
-      pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
-
-      FlinkBroadcastStateInternals sideInputStateInternals =
-          new FlinkBroadcastStateInternals<>(
-              getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend());
-
-      sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
-      sideInputReader = sideInputHandler;
-
-      // maybe init by initializeState
-      if (pushbackStateInternals == null) {
-        if (keyCoder != null) {
-          pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
-              getKeyedStateBackend());
-        } else {
-          pushbackStateInternals =
-              new FlinkSplitStateInternals<Object>(getOperatorStateBackend());
-        }
-      }
-
-      pushedBackWatermark = Optional.absent();
-
-    }
-
-    outputManager = outputManagerFactory.create(output);
-
-    // StatefulPardo or WindowDoFn
-    if (keyCoder != null) {
-      stateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(),
-          keyCoder);
-
-      timerService = (HeapInternalTimerService<?, TimerInternals.TimerData>)
-          getInternalTimerService("beam-timer", new CoderTypeSerializer<>(timerCoder), this);
-
-      timerInternals = new FlinkTimerInternals();
-
-    }
-
-    // WindowDoFnOperator need use state and timer to get DoFn.
-    // So must wait StateInternals and TimerInternals ready.
-    this.doFn = getDoFn();
-    doFnInvoker = DoFnInvokers.invokerFor(doFn);
-
-    doFnInvoker.invokeSetup();
-
-    ExecutionContext.StepContext stepContext = createStepContext();
-
-    doFnRunner = DoFnRunners.simpleRunner(
-        serializedOptions.getPipelineOptions(),
-        doFn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        additionalOutputTags,
-        stepContext,
-        aggregatorFactory,
-        windowingStrategy);
-
-    if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
-      // When the doFn is this, we know it came from WindowDoFnOperator and
-      //   InputT = KeyedWorkItem<K, V>
-      //   OutputT = KV<K, V>
-      //
-      // for some K, V
-
-
-      doFnRunner = DoFnRunners.lateDataDroppingRunner(
-          (DoFnRunner) doFnRunner,
-          stepContext,
-          windowingStrategy,
-          ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator());
-    } else if (keyCoder != null) {
-      // It is a stateful DoFn
-
-      StatefulDoFnRunner.CleanupTimer cleanupTimer =
-          new StatefulDoFnRunner.TimeInternalsCleanupTimer(
-              stepContext.timerInternals(), windowingStrategy);
-
-      // we don't know the window type
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
-
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      StatefulDoFnRunner.StateCleaner<?> stateCleaner =
-          new StatefulDoFnRunner.StateInternalsStateCleaner<>(
-              doFn, stepContext.stateInternals(), windowCoder);
-
-      doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
-          doFn,
-          doFnRunner,
-          stepContext,
-          aggregatorFactory,
-          windowingStrategy,
-          cleanupTimer,
-          stateCleaner);
-    }
-
-    pushbackDoFnRunner =
-        SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
-  }
-
-  @Override
-  public void close() throws Exception {
-    super.close();
-    doFnInvoker.invokeTeardown();
-  }
-
-  protected final long getPushbackWatermarkHold() {
-    // if we don't have side inputs we never hold the watermark
-    if (sideInputs.isEmpty()) {
-      return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
-    }
-
-    try {
-      checkInitPushedBackWatermark();
-      return pushedBackWatermark.get();
-    } catch (Exception e) {
-      throw new RuntimeException("Error retrieving pushed back watermark state.", e);
-    }
-  }
-
-  private void checkInitPushedBackWatermark() {
-    // init and restore from pushedBack state.
-    // Not done in initializeState, because OperatorState is not ready.
-    if (!pushedBackWatermark.isPresent()) {
-
-      BagState<WindowedValue<InputT>> pushedBack =
-          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
-      long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
-      for (WindowedValue<InputT> value : pushedBack.read()) {
-        min = Math.min(min, value.getTimestamp().getMillis());
-      }
-      setPushedBackWatermark(min);
-    }
-  }
-
-  @Override
-  public final void processElement(
-      StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
-    doFnRunner.startBundle();
-    doFnRunner.processElement(streamRecord.getValue());
-    doFnRunner.finishBundle();
-  }
-
-  private void setPushedBackWatermark(long watermark) {
-    pushedBackWatermark = Optional.fromNullable(watermark);
-  }
-
-  @Override
-  public final void processElement1(
-      StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
-    pushbackDoFnRunner.startBundle();
-    Iterable<WindowedValue<InputT>> justPushedBack =
-        pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
-
-    BagState<WindowedValue<InputT>> pushedBack =
-        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
-    checkInitPushedBackWatermark();
-
-    long min = pushedBackWatermark.get();
-    for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
-      min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
-      pushedBack.add(pushedBackValue);
-    }
-    setPushedBackWatermark(min);
-    pushbackDoFnRunner.finishBundle();
-  }
-
-  @Override
-  public final void processElement2(
-      StreamRecord<RawUnionValue> streamRecord) throws Exception {
-    pushbackDoFnRunner.startBundle();
-
-    @SuppressWarnings("unchecked")
-    WindowedValue<Iterable<?>> value =
-        (WindowedValue<Iterable<?>>) streamRecord.getValue().getValue();
-
-    PCollectionView<?> sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
-    sideInputHandler.addSideInputValue(sideInput, value);
-
-    BagState<WindowedValue<InputT>> pushedBack =
-        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
-    List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
-
-    Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
-    if (pushedBackContents != null) {
-      for (WindowedValue<InputT> elem : pushedBackContents) {
-
-        // we need to set the correct key in case the operator is
-        // a (keyed) window operator
-        setKeyContextElement1(new StreamRecord<>(elem));
-
-        Iterable<WindowedValue<InputT>> justPushedBack =
-            pushbackDoFnRunner.processElementInReadyWindows(elem);
-        Iterables.addAll(newPushedBack, justPushedBack);
-      }
-    }
-
-    pushedBack.clear();
-    long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
-    for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
-      min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
-      pushedBack.add(pushedBackValue);
-    }
-    setPushedBackWatermark(min);
-
-    pushbackDoFnRunner.finishBundle();
-
-    // maybe output a new watermark
-    processWatermark1(new Watermark(currentInputWatermark));
-  }
-
-  @Override
-  public void processWatermark(Watermark mark) throws Exception {
-    processWatermark1(mark);
-  }
-
-  @Override
-  public void processWatermark1(Watermark mark) throws Exception {
-    if (keyCoder == null) {
-      this.currentInputWatermark = mark.getTimestamp();
-      long potentialOutputWatermark =
-          Math.min(getPushbackWatermarkHold(), currentInputWatermark);
-      if (potentialOutputWatermark > currentOutputWatermark) {
-        currentOutputWatermark = potentialOutputWatermark;
-        output.emitWatermark(new Watermark(currentOutputWatermark));
-      }
-    } else {
-      // fireTimers, so we need startBundle.
-      pushbackDoFnRunner.startBundle();
-
-      this.currentInputWatermark = mark.getTimestamp();
-
-      // hold back by the pushed back values waiting for side inputs
-      long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
-
-      timerService.advanceWatermark(actualInputWatermark);
-
-      Instant watermarkHold = stateInternals.watermarkHold();
-
-      long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
-
-      long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
-
-      if (potentialOutputWatermark > currentOutputWatermark) {
-        currentOutputWatermark = potentialOutputWatermark;
-        output.emitWatermark(new Watermark(currentOutputWatermark));
-      }
-      pushbackDoFnRunner.finishBundle();
-    }
-  }
-
-  @Override
-  public void processWatermark2(Watermark mark) throws Exception {
-    // ignore watermarks from the side-input input
-  }
-
-  @Override
-  public void snapshotState(StateSnapshotContext context) throws Exception {
-    // copy from AbstractStreamOperator
-    if (getKeyedStateBackend() != null) {
-      KeyedStateCheckpointOutputStream out;
-
-      try {
-        out = context.getRawKeyedOperatorStateOutput();
-      } catch (Exception exception) {
-        throw new Exception("Could not open raw keyed operator state stream for "
-            + getOperatorName() + '.', exception);
-      }
-
-      try {
-        KeyGroupsList allKeyGroups = out.getKeyGroupList();
-        for (int keyGroupIdx : allKeyGroups) {
-          out.startNewKeyGroup(keyGroupIdx);
-
-          DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
-
-          // if (this instanceof KeyGroupCheckpointedOperator)
-          snapshotKeyGroupState(keyGroupIdx, dov);
-
-          // We can't get all timerServices, so we just snapshot our timerService
-          // Maybe this is a normal DoFn that has no timerService
-          if (keyCoder != null) {
-            timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
-          }
-
-        }
-      } catch (Exception exception) {
-        throw new Exception("Could not write timer service of " + getOperatorName()
-            + " to checkpoint state stream.", exception);
-      } finally {
-        try {
-          out.close();
-        } catch (Exception closeException) {
-          LOG.warn("Could not close raw keyed operator state stream for {}. This "
-              + "might have prevented deleting some state data.", getOperatorName(),
-              closeException);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception {
-    if (!sideInputs.isEmpty() && keyCoder != null) {
-      ((FlinkKeyGroupStateInternals) pushbackStateInternals).snapshotKeyGroupState(
-          keyGroupIndex, out);
-    }
-  }
-
-  @Override
-  public void initializeState(StateInitializationContext context) throws Exception {
-    if (getKeyedStateBackend() != null) {
-      int totalKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
-      KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange();
-
-      for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
-        DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream());
-
-        int keyGroupIdx = streamProvider.getKeyGroupId();
-        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
-            "Key Group " + keyGroupIdx + " does not belong to the local range.");
-
-        // if (this instanceof KeyGroupRestoringOperator)
-        restoreKeyGroupState(keyGroupIdx, div);
-
-        // We just initialize our timerService
-        if (keyCoder != null) {
-          if (timerService == null) {
-            timerService = new HeapInternalTimerService<>(
-                totalKeyGroups,
-                localKeyGroupRange,
-                this,
-                getRuntimeContext().getProcessingTimeService());
-          }
-          timerService.restoreTimersForKeyGroup(div, keyGroupIdx, getUserCodeClassloader());
-        }
-      }
-    }
-  }
-
-  @Override
-  public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception {
-    if (!sideInputs.isEmpty() && keyCoder != null) {
-      if (pushbackStateInternals == null) {
-        pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
-            getKeyedStateBackend());
-      }
-      ((FlinkKeyGroupStateInternals) pushbackStateInternals)
-          .restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader());
-    }
-  }
-
-  @Override
-  public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception {
-    fireTimer(timer);
-  }
-
-  @Override
-  public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception {
-    fireTimer(timer);
-  }
-
-  // allow overriding this in WindowDoFnOperator
-  public void fireTimer(InternalTimer<?, TimerData> timer) {
-    TimerInternals.TimerData timerData = timer.getNamespace();
-    StateNamespace namespace = timerData.getNamespace();
-    // This is a user timer, so namespace must be WindowNamespace
-    checkArgument(namespace instanceof WindowNamespace);
-    BoundedWindow window = ((WindowNamespace) namespace).getWindow();
-    pushbackDoFnRunner.onTimer(timerData.getTimerId(), window,
-        timerData.getTimestamp(), timerData.getDomain());
-  }
-
-  /**
-   * Factory for creating an {@link DoFnRunners.OutputManager} from
-   * a Flink {@link Output}.
-   */
-  interface OutputManagerFactory<OutputT> extends Serializable {
-    DoFnRunners.OutputManager create(Output<StreamRecord<OutputT>> output);
-  }
-
-  /**
-   * Default implementation of {@link OutputManagerFactory} that creates an
-   * {@link DoFnRunners.OutputManager} that only writes to
-   * a single logical output.
-   */
-  public static class DefaultOutputManagerFactory<OutputT>
-      implements OutputManagerFactory<OutputT> {
-    @Override
-    public DoFnRunners.OutputManager create(final Output<StreamRecord<OutputT>> output) {
-      return new DoFnRunners.OutputManager() {
-        @Override
-        public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
-          // with tagged outputs we can't get around this because we don't
-          // know our own output type...
-          @SuppressWarnings("unchecked")
-          OutputT castValue = (OutputT) value;
-          output.collect(new StreamRecord<>(castValue));
-        }
-      };
-    }
-  }
-
-  /**
-   * Implementation of {@link OutputManagerFactory} that creates an
-   * {@link DoFnRunners.OutputManager} that can write to multiple logical
-   * outputs by unioning them in a {@link RawUnionValue}.
-   */
-  public static class MultiOutputOutputManagerFactory
-      implements OutputManagerFactory<RawUnionValue> {
-
-    Map<TupleTag<?>, Integer> mapping;
-
-    public MultiOutputOutputManagerFactory(Map<TupleTag<?>, Integer> mapping) {
-      this.mapping = mapping;
-    }
-
-    @Override
-    public DoFnRunners.OutputManager create(final Output<StreamRecord<RawUnionValue>> output) {
-      return new DoFnRunners.OutputManager() {
-        @Override
-        public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
-          int intTag = mapping.get(tag);
-          output.collect(new StreamRecord<>(new RawUnionValue(intTag, value)));
-        }
-      };
-    }
-  }
-
-  /**
-   * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow
-   * accessing state or timer internals.
-   */
-  protected class StepContext implements ExecutionContext.StepContext {
-
-    @Override
-    public String getStepName() {
-      return null;
-    }
-
-    @Override
-    public String getTransformName() {
-      return null;
-    }
-
-    @Override
-    public void noteOutput(WindowedValue<?> output) {}
-
-    @Override
-    public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {}
-
-    @Override
-    public <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data,
-        Coder<Iterable<WindowedValue<T>>> dataCoder,
-        W window,
-        Coder<W> windowCoder) throws IOException {
-      throw new UnsupportedOperationException("Writing side-input data is not supported.");
-    }
-
-    @Override
-    public StateInternals<?> stateInternals() {
-      return stateInternals;
-    }
-
-    @Override
-    public TimerInternals timerInternals() {
-      return timerInternals;
-    }
-  }
-
-  private class FlinkTimerInternals implements TimerInternals {
-
-    @Override
-    public void setTimer(
-        StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
-      setTimer(TimerData.of(timerId, namespace, target, timeDomain));
-    }
-
-    @Deprecated
-    @Override
-    public void setTimer(TimerData timerKey) {
-      long time = timerKey.getTimestamp().getMillis();
-      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
-        timerService.registerEventTimeTimer(timerKey, time);
-      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
-        timerService.registerProcessingTimeTimer(timerKey, time);
-      } else {
-        throw new UnsupportedOperationException(
-            "Unsupported time domain: " + timerKey.getDomain());
-      }
-    }
-
-    @Deprecated
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId) {
-      throw new UnsupportedOperationException(
-          "Canceling of a timer by ID is not yet supported.");
-    }
-
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
-      throw new UnsupportedOperationException(
-          "Canceling of a timer by ID is not yet supported.");
-    }
-
-    @Deprecated
-    @Override
-    public void deleteTimer(TimerData timerKey) {
-      long time = timerKey.getTimestamp().getMillis();
-      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
-        timerService.deleteEventTimeTimer(timerKey, time);
-      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
-        timerService.deleteProcessingTimeTimer(timerKey, time);
-      } else {
-        throw new UnsupportedOperationException(
-            "Unsupported time domain: " + timerKey.getDomain());
-      }
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return new Instant(timerService.currentProcessingTime());
-    }
-
-    @Nullable
-    @Override
-    public Instant currentSynchronizedProcessingTime() {
-      return new Instant(timerService.currentProcessingTime());
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold()));
-    }
-
-    @Nullable
-    @Override
-    public Instant currentOutputWatermarkTime() {
-      return new Instant(currentOutputWatermark);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
deleted file mode 100644
index dce2e68..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import java.nio.ByteBuffer;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-/**
- * {@link KeySelector} that retrieves a key from a {@link KV}. This will return
- * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures
- * that all key comparisons/hashing happen on the encoded form.
- */
-public class KvToByteBufferKeySelector<K, V>
-    implements KeySelector<WindowedValue<KV<K, V>>, ByteBuffer>,
-    ResultTypeQueryable<ByteBuffer> {
-
-  private final Coder<K> keyCoder;
-
-  public KvToByteBufferKeySelector(Coder<K> keyCoder) {
-    this.keyCoder = keyCoder;
-  }
-
-  @Override
-  public ByteBuffer getKey(WindowedValue<KV<K, V>> value) throws Exception {
-    K key = value.getValue().getKey();
-    byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
-    return ByteBuffer.wrap(keyBytes);
-  }
-
-  @Override
-  public TypeInformation<ByteBuffer> getProducedType() {
-    return new GenericTypeInfo<>(ByteBuffer.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
deleted file mode 100644
index e843660..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import java.util.Collections;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Singleton keyed word item.
- */
-public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
-
-  final K key;
-  final WindowedValue<ElemT> value;
-
-  public SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
-    this.key = key;
-    this.value = value;
-  }
-
-  @Override
-  public K key() {
-    return key;
-  }
-
-  public WindowedValue<ElemT> value() {
-    return value;
-  }
-
-  @Override
-  public Iterable<TimerInternals.TimerData> timersIterable() {
-    return Collections.EMPTY_LIST;
-  }
-
-  @Override
-  public Iterable<WindowedValue<ElemT>> elementsIterable() {
-    return Collections.singletonList(value);
-  }
-}