You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/06/19 09:36:09 UTC

[1/2] incubator-beam git commit: [BEAM-321] Fix Flink Comparators

Repository: incubator-beam
Updated Branches:
  refs/heads/master 69a4141a5 -> 0e4d0a9ae


[BEAM-321] Fix Flink Comparators

KvCoderComparator and CoderComparator were hashing the key directly
while doing comparisons on the encoded form. This lead to
inconsistencies in GroupByKey results with large numbers of elements per
key.

This changes the comparators to hash on the encoded form and also adds
tests to verify the correct behavior.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/93ca508b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/93ca508b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/93ca508b

Branch: refs/heads/master
Commit: 93ca508bbdf558772e909df099148a9d03a1e6d4
Parents: 69a4141
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jun 1 11:56:18 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Jun 19 10:31:29 2016 +0200

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |  10 +
 .../beam/runners/flink/FlinkRunnerResult.java   |   8 +
 .../FlinkBatchTransformTranslators.java         |  49 +---
 .../FlinkBatchTranslationContext.java           |  13 +-
 .../translation/types/CoderComparator.java      | 217 ----------------
 .../translation/types/CoderTypeInformation.java |  14 +-
 .../translation/types/CoderTypeSerializer.java  |  37 +--
 .../types/EncodedValueComparator.java           | 197 ++++++++++++++
 .../types/EncodedValueSerializer.java           | 113 ++++++++
 .../types/EncodedValueTypeInformation.java      | 111 ++++++++
 .../translation/types/KvCoderComperator.java    | 259 -------------------
 .../types/KvCoderTypeInformation.java           | 207 ---------------
 .../flink/translation/types/KvKeySelector.java  |  51 ++++
 .../flink/EncodedValueComparatorTest.java       |  71 +++++
 .../beam/sdk/transforms/GroupByKeyTest.java     | 206 ++++++++++++++-
 15 files changed, 799 insertions(+), 764 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 8933457..33c13bf 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -50,6 +50,16 @@
       <version>${flink.version}</version>
     </dependency>
 
+    <!-- For testing -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-core</artifactId>
+      <version>${flink.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+
     <!-- Beam -->
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 481d867..a8f4cac 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -65,4 +65,12 @@ public class FlinkRunnerResult implements PipelineResult {
           new RuntimeException("Accumulator does not exist."));
     }
   }
+
+  @Override
+  public String toString() {
+    return "FlinkRunnerResult{" +
+        "aggregators=" + aggregators +
+        ", runtime=" + runtime +
+        '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 200e4af..ac058b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -27,7 +27,7 @@ import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruni
 import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
 import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -65,7 +65,6 @@ import com.google.common.collect.Maps;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSource;
@@ -75,7 +74,6 @@ import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.Grouping;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.SingleInputUdfOperator;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
 import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
@@ -207,23 +205,15 @@ class FlinkBatchTransformTranslators {
       WindowingStrategy<?, ?> windowingStrategy =
           context.getInput(transform).getWindowingStrategy();
 
-      TypeInformation<WindowedValue<KV<K, InputT>>> kvCoderTypeInformation =
-          new KvCoderTypeInformation<>(
-              WindowedValue.getFullCoder(
-                  inputCoder,
-                  windowingStrategy.getWindowFn().windowCoder()));
-
       TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo =
-          new KvCoderTypeInformation<>(
+          new CoderTypeInformation<>(
               WindowedValue.getFullCoder(
                   KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
                   windowingStrategy.getWindowFn().windowCoder()));
 
+
       Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
-          new UnsortedGrouping<>(
-              inputDataSet,
-              new Keys.ExpressionKeys<>(new String[]{"key"},
-                  kvCoderTypeInformation));
+          inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
 
       FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction;
       FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction;
@@ -279,8 +269,7 @@ class FlinkBatchTransformTranslators {
               "GroupCombine: " + transform.getName());
 
       Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping =
-          new UnsortedGrouping<>(
-              groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+          groupCombine.groupBy(new KvKeySelector<List<InputT>, K>(inputCoder.getKeyCoder()));
 
       // Fully reduce the values and create output format VO
       GroupReduceOperator<
@@ -291,6 +280,7 @@ class FlinkBatchTransformTranslators {
       context.setOutputDataSet(context.getOutput(transform), outputDataSet);
 
     }
+
   }
 
   /**
@@ -373,23 +363,13 @@ class FlinkBatchTransformTranslators {
       WindowingStrategy<?, ?> windowingStrategy =
           context.getInput(transform).getWindowingStrategy();
 
-      TypeInformation<WindowedValue<KV<K, InputT>>> kvCoderTypeInformation =
-          new KvCoderTypeInformation<>(
-              WindowedValue.getFullCoder(
-                  inputCoder,
-                  windowingStrategy.getWindowFn().windowCoder()));
-
       TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo =
-          new KvCoderTypeInformation<>(
-              WindowedValue.getFullCoder(
-                  KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
-                  windowingStrategy.getWindowFn().windowCoder()));
+          context.getTypeInfo(
+              KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+              windowingStrategy);
 
       Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
-          new UnsortedGrouping<>(
-              inputDataSet,
-              new Keys.ExpressionKeys<>(new String[]{"key"},
-                  kvCoderTypeInformation));
+          inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
 
       // construct a map from side input to WindowingStrategy so that
       // the DoFn runner can map main-input windows to side input windows
@@ -432,9 +412,7 @@ class FlinkBatchTransformTranslators {
             context.getTypeInfo(context.getOutput(transform));
 
         Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping =
-            new UnsortedGrouping<>(
-                groupCombine,
-                new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+            groupCombine.groupBy(new KvKeySelector<AccumT, K>(inputCoder.getKeyCoder()));
 
         // Fully reduce the values and create output format OutputT
         GroupReduceOperator<
@@ -469,9 +447,7 @@ class FlinkBatchTransformTranslators {
             context.getTypeInfo(context.getOutput(transform));
 
         Grouping<WindowedValue<KV<K, InputT>>> grouping =
-            new UnsortedGrouping<>(
-                inputDataSet,
-                new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
+            inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
 
         // Fully reduce the values and create output format OutputT
         GroupReduceOperator<
@@ -720,4 +696,5 @@ class FlinkBatchTransformTranslators {
   }
 
   private FlinkBatchTransformTranslators() {}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
index ecc3a65..a73bf13 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
@@ -119,11 +120,17 @@ public class FlinkBatchTranslationContext {
 
   @SuppressWarnings("unchecked")
   public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
-    Coder<T> valueCoder = collection.getCoder();
+    return getTypeInfo(collection.getCoder(), collection.getWindowingStrategy());
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(
+      Coder<T> coder,
+      WindowingStrategy<?, ?> windowingStrategy) {
     WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
         WindowedValue.getFullCoder(
-            valueCoder,
-            collection.getWindowingStrategy().getWindowFn().windowCoder());
+            coder,
+            windowingStrategy.getWindowFn().windowCoder());
 
     return new CoderTypeInformation<>(windowedValueCoder);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java
deleted file mode 100644
index e06741c..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.beam.sdk.coders.Coder;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for
- * {@link org.apache.beam.sdk.coders.Coder}.
- */
-public class CoderComparator<T> extends TypeComparator<T> {
-
-  private Coder<T> coder;
-
-  // We use these for internal encoding/decoding for creating copies and comparing
-  // serialized forms using a Coder
-  private transient InspectableByteArrayOutputStream buffer1;
-  private transient InspectableByteArrayOutputStream buffer2;
-
-  // For storing the Reference in encoded form
-  private transient InspectableByteArrayOutputStream referenceBuffer;
-
-  public CoderComparator(Coder<T> coder) {
-    this.coder = coder;
-    buffer1 = new InspectableByteArrayOutputStream();
-    buffer2 = new InspectableByteArrayOutputStream();
-    referenceBuffer = new InspectableByteArrayOutputStream();
-  }
-
-  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-    buffer1 = new InspectableByteArrayOutputStream();
-    buffer2 = new InspectableByteArrayOutputStream();
-    referenceBuffer = new InspectableByteArrayOutputStream();
-  }
-
-  @Override
-  public int hash(T record) {
-    return record.hashCode();
-  }
-
-  @Override
-  public void setReference(T toCompare) {
-    referenceBuffer.reset();
-    try {
-      coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
-    }
-  }
-
-  @Override
-  public boolean equalToReference(T candidate) {
-    try {
-      buffer2.reset();
-      coder.encode(candidate, buffer2, Coder.Context.OUTER);
-      byte[] arr = referenceBuffer.getBuffer();
-      byte[] arrOther = buffer2.getBuffer();
-      if (referenceBuffer.size() != buffer2.size()) {
-        return false;
-      }
-      int len = buffer2.size();
-      for(int i = 0; i < len; i++ ) {
-        if (arr[i] != arrOther[i]) {
-          return false;
-        }
-      }
-      return true;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not compare reference.", e);
-    }
-  }
-
-  @Override
-  public int compareToReference(TypeComparator<T> other) {
-    InspectableByteArrayOutputStream otherReferenceBuffer = ((CoderComparator<T>) other).referenceBuffer;
-
-    byte[] arr = referenceBuffer.getBuffer();
-    byte[] arrOther = otherReferenceBuffer.getBuffer();
-    if (referenceBuffer.size() != otherReferenceBuffer.size()) {
-      return referenceBuffer.size() - otherReferenceBuffer.size();
-    }
-    int len = referenceBuffer.size();
-    for (int i = 0; i < len; i++) {
-      if (arr[i] != arrOther[i]) {
-        return arr[i] - arrOther[i];
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public int compare(T first, T second) {
-    try {
-      buffer1.reset();
-      buffer2.reset();
-      coder.encode(first, buffer1, Coder.Context.OUTER);
-      coder.encode(second, buffer2, Coder.Context.OUTER);
-      byte[] arr = buffer1.getBuffer();
-      byte[] arrOther = buffer2.getBuffer();
-      if (buffer1.size() != buffer2.size()) {
-        return buffer1.size() - buffer2.size();
-      }
-      int len = buffer1.size();
-      for(int i = 0; i < len; i++ ) {
-        if (arr[i] != arrOther[i]) {
-          return arr[i] - arrOther[i];
-        }
-      }
-      return 0;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not compare: ", e);
-    }
-  }
-
-  @Override
-  public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-    CoderTypeSerializer<T> serializer = new CoderTypeSerializer<>(coder);
-    T first = serializer.deserialize(firstSource);
-    T second = serializer.deserialize(secondSource);
-    return compare(first, second);
-  }
-
-  @Override
-  public boolean supportsNormalizedKey() {
-    return true;
-  }
-
-  @Override
-  public boolean supportsSerializationWithKeyNormalization() {
-    return false;
-  }
-
-  @Override
-  public int getNormalizeKeyLen() {
-    return Integer.MAX_VALUE;
-  }
-
-  @Override
-  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-    return true;
-  }
-
-  @Override
-  public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-    buffer1.reset();
-    try {
-      coder.encode(record, buffer1, Coder.Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e);
-    }
-    final byte[] data = buffer1.getBuffer();
-    final int limit = offset + numBytes;
-
-    target.put(offset, data, 0, Math.min(numBytes, buffer1.size()));
-
-    offset += buffer1.size();
-
-    while (offset < limit) {
-      target.put(offset++, (byte) 0);
-    }
-  }
-
-  @Override
-  public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean invertNormalizedKey() {
-    return false;
-  }
-
-  @Override
-  public TypeComparator<T> duplicate() {
-    return new CoderComparator<>(coder);
-  }
-
-  @Override
-  public int extractKeys(Object record, Object[] target, int index) {
-    target[index] = record;
-    return 1;
-  }
-
-  @Override
-  public TypeComparator[] getFlatComparators() {
-    return new TypeComparator[] { this.duplicate() };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
index 4434cf8..0e85486 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
@@ -18,8 +18,6 @@
 package org.apache.beam.runners.flink.translation.types;
 
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.util.WindowedValue;
 
 import com.google.common.base.Preconditions;
 
@@ -107,19 +105,13 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi
 
   @Override
   public String toString() {
-    return "CoderTypeInformation{" +
-        "coder=" + coder +
-        '}';
+    return "CoderTypeInformation{coder=" + coder + '}';
   }
 
   @Override
   public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig
       executionConfig) {
-    WindowedValue.WindowedValueCoder windowCoder = (WindowedValue.WindowedValueCoder) coder;
-    if (windowCoder.getValueCoder() instanceof KvCoder) {
-      return new KvCoderComperator(windowCoder);
-    } else {
-      return new CoderComparator<>(coder);
-    }
+    throw new UnsupportedOperationException(
+        "Non-encoded values cannot be compared directly.");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index 097316b..4621951 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -21,6 +21,7 @@ import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
@@ -38,28 +39,11 @@ import java.io.ObjectInputStream;
 public class CoderTypeSerializer<T> extends TypeSerializer<T> {
   
   private Coder<T> coder;
-  private transient DataInputViewWrapper inputWrapper;
-  private transient DataOutputViewWrapper outputWrapper;
-
-  // We use this for internal encoding/decoding for creating copies using the Coder.
-  private transient InspectableByteArrayOutputStream buffer;
 
   public CoderTypeSerializer(Coder<T> coder) {
     this.coder = coder;
-    this.inputWrapper = new DataInputViewWrapper(null);
-    this.outputWrapper = new DataOutputViewWrapper(null);
-
-    buffer = new InspectableByteArrayOutputStream();
   }
-  
-  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-    this.inputWrapper = new DataInputViewWrapper(null);
-    this.outputWrapper = new DataOutputViewWrapper(null);
 
-    buffer = new InspectableByteArrayOutputStream();
-  }
-  
   @Override
   public boolean isImmutableType() {
     return false;
@@ -77,17 +61,10 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> {
 
   @Override
   public T copy(T t) {
-    buffer.reset();
-    try {
-      coder.encode(t, buffer, Coder.Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not copy.", e);
-    }
     try {
-      return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, buffer
-          .size()), Coder.Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not copy.", e);
+      return CoderUtils.clone(coder, t);
+    } catch (CoderException e) {
+      throw new RuntimeException("Could not clone.", e);
     }
   }
 
@@ -98,19 +75,19 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> {
 
   @Override
   public int getLength() {
-    return 0;
+    return -1;
   }
 
   @Override
   public void serialize(T t, DataOutputView dataOutputView) throws IOException {
-    outputWrapper.setOutputView(dataOutputView);
+    DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
     coder.encode(t, outputWrapper, Coder.Context.NESTED);
   }
 
   @Override
   public T deserialize(DataInputView dataInputView) throws IOException {
     try {
-      inputWrapper.setInputView(dataInputView);
+      DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
       return coder.decode(inputWrapper, Coder.Context.NESTED);
     } catch (CoderException e) {
       Throwable cause = e.getCause();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
new file mode 100644
index 0000000..69bcb41
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.sdk.coders.Coder;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for Beam values that have
+ * been encoded to byte data by a {@link Coder}.
+ */
+public class EncodedValueComparator extends TypeComparator<byte[]> {
+
+  /** For storing the Reference in encoded form. */
+  private transient byte[] encodedReferenceKey;
+
+  private final boolean ascending;
+
+  public EncodedValueComparator(boolean ascending) {
+    this.ascending = ascending;
+  }
+
+  @Override
+  public int hash(byte[] record) {
+    return Arrays.hashCode(record);
+  }
+
+  @Override
+  public void setReference(byte[] toCompare) {
+    this.encodedReferenceKey = toCompare;
+  }
+
+  @Override
+  public boolean equalToReference(byte[] candidate) {
+    if (encodedReferenceKey.length != candidate.length) {
+      return false;
+    }
+    int len = candidate.length;
+    for (int i = 0; i < len; i++) {
+      if (encodedReferenceKey[i] != candidate[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public int compareToReference(TypeComparator<byte[]> other) {
+    // VERY IMPORTANT: compareToReference does not behave like Comparable.compare
+    // the meaning of the return value is inverted.
+
+    EncodedValueComparator otherEncodedValueComparator = (EncodedValueComparator) other;
+
+    int len = Math.min(
+        encodedReferenceKey.length,
+        otherEncodedValueComparator.encodedReferenceKey.length);
+
+    for (int i = 0; i < len; i++) {
+      byte b1 = encodedReferenceKey[i];
+      byte b2 = otherEncodedValueComparator.encodedReferenceKey[i];
+      int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
+      if (result != 0) {
+        return ascending ? -result : result;
+      }
+    }
+    int result =
+        encodedReferenceKey.length - otherEncodedValueComparator.encodedReferenceKey.length;
+    return ascending ? -result : result;
+  }
+
+
+  @Override
+  public int compare(byte[] first, byte[] second) {
+    int len = Math.min(first.length, second.length);
+    for (int i = 0; i < len; i++) {
+      byte b1 = first[i];
+      byte b2 = second[i];
+      int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
+      if (result != 0) {
+        return ascending ? result : -result;
+      }
+    }
+    int result = first.length - second.length;
+    return ascending ? result : -result;
+  }
+
+  @Override
+  public int compareSerialized(
+      DataInputView firstSource,
+      DataInputView secondSource) throws IOException {
+    int lengthFirst = firstSource.readInt();
+    int lengthSecond = secondSource.readInt();
+
+    int len = Math.min(lengthFirst, lengthSecond);
+    for (int i = 0; i < len; i++) {
+      byte b1 = firstSource.readByte();
+      byte b2 = secondSource.readByte();
+      int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
+      if (result != 0) {
+        return ascending ? result : -result;
+      }
+    }
+
+    int result = lengthFirst - lengthSecond;
+    return ascending ? result : -result;
+  }
+
+
+
+  @Override
+  public boolean supportsNormalizedKey() {
+    // disabled because this seems to not work with some coders,
+    // such as the AvroCoder
+    return false;
+  }
+
+  @Override
+  public boolean supportsSerializationWithKeyNormalization() {
+    return false;
+  }
+
+  @Override
+  public int getNormalizeKeyLen() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+    return true;
+  }
+
+  @Override
+  public void putNormalizedKey(byte[] record, MemorySegment target, int offset, int numBytes) {
+    final int limit = offset + numBytes;
+
+    target.put(offset, record, 0, Math.min(numBytes, record.length));
+
+    offset += record.length;
+
+    while (offset < limit) {
+      target.put(offset++, (byte) 0);
+    }
+  }
+
+  @Override
+  public void writeWithKeyNormalization(byte[] record, DataOutputView target) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public byte[] readWithKeyDenormalization(byte[] reuse, DataInputView source) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean invertNormalizedKey() {
+    return !ascending;
+  }
+
+  @Override
+  public TypeComparator<byte[]> duplicate() {
+    return new EncodedValueComparator(ascending);
+  }
+
+  @Override
+  public int extractKeys(Object record, Object[] target, int index) {
+    target[index] = record;
+    return 1;
+  }
+
+  @Override
+  public TypeComparator[] getFlatComparators() {
+    return new TypeComparator[] { this.duplicate() };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
new file mode 100644
index 0000000..33af8d9
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.sdk.coders.Coder;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * {@link TypeSerializer} for values that were encoded using a {@link Coder}.
+ */
+public final class EncodedValueSerializer extends TypeSerializer<byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final byte[] EMPTY = new byte[0];
+
+	@Override
+	public boolean isImmutableType() {
+		return true;
+	}
+
+	@Override
+	public byte[] createInstance() {
+		return EMPTY;
+	}
+
+	@Override
+	public byte[] copy(byte[] from) {
+		return from;
+	}
+	
+	@Override
+	public byte[] copy(byte[] from, byte[] reuse) {
+		return copy(from);
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+
+	@Override
+	public void serialize(byte[] record, DataOutputView target) throws IOException {
+		if (record == null) {
+			throw new IllegalArgumentException("The record must not be null.");
+		}
+		
+		final int len = record.length;
+		target.writeInt(len);
+		target.write(record);
+	}
+
+	@Override
+	public byte[] deserialize(DataInputView source) throws IOException {
+		final int len = source.readInt();
+		byte[] result = new byte[len];
+		source.readFully(result);
+		return result;
+	}
+	
+	@Override
+	public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		final int len = source.readInt();
+		target.writeInt(len);
+		target.write(source, len);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof EncodedValueSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return this.getClass().hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj instanceof EncodedValueSerializer;
+	}
+
+	@Override
+	public TypeSerializer<byte[]> duplicate() {
+		return this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
new file mode 100644
index 0000000..46c854f
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.sdk.coders.Coder;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+
+import java.util.Objects;
+
+/**
+ * Flink {@link TypeInformation} for Beam values that have been encoded to byte data
+ * by a {@link Coder}.
+ */
+public class EncodedValueTypeInformation
+    extends TypeInformation<byte[]>
+    implements AtomicType<byte[]> {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  @PublicEvolving
+  public boolean isBasicType() {
+    return false;
+  }
+
+  @Override
+  @PublicEvolving
+  public boolean isTupleType() {
+    return false;
+  }
+
+  @Override
+  @PublicEvolving
+  public int getArity() {
+    return 0;
+  }
+
+  @Override
+  @PublicEvolving
+  public int getTotalFields() {
+    return 0;
+  }
+
+  @Override
+  @PublicEvolving
+  public Class<byte[]> getTypeClass() {
+    return byte[].class;
+  }
+
+  @Override
+  @PublicEvolving
+  public boolean isKeyType() {
+    return true;
+  }
+
+  @Override
+  @PublicEvolving
+  public TypeSerializer<byte[]> createSerializer(ExecutionConfig executionConfig) {
+    return new EncodedValueSerializer();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    return other instanceof EncodedValueTypeInformation;
+  }
+
+  @Override
+  public int hashCode() {
+    return this.getClass().hashCode();
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof EncodedValueTypeInformation;
+  }
+
+  @Override
+  public String toString() {
+    return "EncodedValueTypeInformation";
+  }
+
+  @Override
+  @PublicEvolving
+  public TypeComparator<byte[]> createComparator(
+      boolean sortOrderAscending,
+      ExecutionConfig executionConfig) {
+    return new EncodedValueComparator(sortOrderAscending);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java
deleted file mode 100644
index 79b127d..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-
-/**
- * Flink {@link TypeComparator} for {@link KvCoder}. We have a special comparator
- * for {@link KV} that always compares on the key only.
- */
-public class KvCoderComperator <K, V> extends TypeComparator<WindowedValue<KV<K, V>>> {
-  
-  private final WindowedValue.WindowedValueCoder<KV<K, V>> coder;
-  private final Coder<K> keyCoder;
-
-  // We use these for internal encoding/decoding for creating copies and comparing
-  // serialized forms using a Coder
-  private transient InspectableByteArrayOutputStream buffer1;
-  private transient InspectableByteArrayOutputStream buffer2;
-
-  // For storing the Reference in encoded form
-  private transient InspectableByteArrayOutputStream referenceBuffer;
-
-
-  // For deserializing the key
-  private transient DataInputViewWrapper inputWrapper;
-
-  public KvCoderComperator(WindowedValue.WindowedValueCoder<KV<K, V>> coder) {
-    this.coder = coder;
-    KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder();
-    this.keyCoder = kvCoder.getKeyCoder();
-
-    buffer1 = new InspectableByteArrayOutputStream();
-    buffer2 = new InspectableByteArrayOutputStream();
-    referenceBuffer = new InspectableByteArrayOutputStream();
-
-    inputWrapper = new DataInputViewWrapper(null);
-  }
-
-  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-
-    buffer1 = new InspectableByteArrayOutputStream();
-    buffer2 = new InspectableByteArrayOutputStream();
-    referenceBuffer = new InspectableByteArrayOutputStream();
-
-    inputWrapper = new DataInputViewWrapper(null);
-  }
-
-  @Override
-  public int hash(WindowedValue<KV<K, V>> record) {
-    K key = record.getValue().getKey();
-    if (key != null) {
-      return key.hashCode();
-    } else {
-      return 0;
-    }
-  }
-
-  @Override
-  public void setReference(WindowedValue<KV<K, V>> toCompare) {
-    referenceBuffer.reset();
-    try {
-      keyCoder.encode(toCompare.getValue().getKey(), referenceBuffer, Coder.Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
-    }
-  }
-
-  @Override
-  public boolean equalToReference(WindowedValue<KV<K, V>> candidate) {
-    try {
-      buffer2.reset();
-      keyCoder.encode(candidate.getValue().getKey(), buffer2, Coder.Context.OUTER);
-      byte[] arr = referenceBuffer.getBuffer();
-      byte[] arrOther = buffer2.getBuffer();
-      if (referenceBuffer.size() != buffer2.size()) {
-        return false;
-      }
-      int len = buffer2.size();
-      for (int i = 0; i < len; i++) {
-        if (arr[i] != arrOther[i]) {
-          return false;
-        }
-      }
-      return true;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not compare reference.", e);
-    }
-  }
-
-  @Override
-  public int compareToReference(TypeComparator<WindowedValue<KV<K, V>>> other) {
-    InspectableByteArrayOutputStream otherReferenceBuffer =
-        ((KvCoderComperator<K, V>) other).referenceBuffer;
-
-    byte[] arr = referenceBuffer.getBuffer();
-    byte[] arrOther = otherReferenceBuffer.getBuffer();
-    if (referenceBuffer.size() != otherReferenceBuffer.size()) {
-      return referenceBuffer.size() - otherReferenceBuffer.size();
-    }
-    int len = referenceBuffer.size();
-    for (int i = 0; i < len; i++) {
-      if (arr[i] != arrOther[i]) {
-        return arr[i] - arrOther[i];
-      }
-    }
-    return 0;
-  }
-
-
-  @Override
-  public int compare(WindowedValue<KV<K, V>> first, WindowedValue<KV<K, V>> second) {
-    try {
-      buffer1.reset();
-      buffer2.reset();
-      keyCoder.encode(first.getValue().getKey(), buffer1, Coder.Context.OUTER);
-      keyCoder.encode(second.getValue().getKey(), buffer2, Coder.Context.OUTER);
-      byte[] arr = buffer1.getBuffer();
-      byte[] arrOther = buffer2.getBuffer();
-      if (buffer1.size() != buffer2.size()) {
-        return buffer1.size() - buffer2.size();
-      }
-      int len = buffer1.size();
-      for (int i = 0; i < len; i++) {
-        if (arr[i] != arrOther[i]) {
-          return arr[i] - arrOther[i];
-        }
-      }
-      return 0;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not compare reference.", e);
-    }
-  }
-
-  @Override
-  public int compareSerialized(
-      DataInputView firstSource,
-      DataInputView secondSource) throws IOException {
-    inputWrapper.setInputView(firstSource);
-    WindowedValue<KV<K, V>> first = coder.decode(inputWrapper, Coder.Context.NESTED);
-    inputWrapper.setInputView(secondSource);
-    WindowedValue<KV<K, V>> second = coder.decode(inputWrapper, Coder.Context.NESTED);
-    return compare(first, second);
-  }
-
-  @Override
-  public boolean supportsNormalizedKey() {
-    return false;
-  }
-
-  @Override
-  public boolean supportsSerializationWithKeyNormalization() {
-    return false;
-  }
-
-  @Override
-  public int getNormalizeKeyLen() {
-    return Integer.MAX_VALUE;
-  }
-
-  @Override
-  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-    return true;
-  }
-
-  @Override
-  public void putNormalizedKey(
-      WindowedValue<KV<K, V>> record,
-      MemorySegment target,
-      int offset,
-      int numBytes) {
-
-    buffer1.reset();
-    try {
-      keyCoder.encode(record.getValue().getKey(), buffer1, Coder.Context.NESTED);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          "Could not serializer " + record + " using coder " + coder + ": " + e);
-    }
-    final byte[] data = buffer1.getBuffer();
-    final int limit = offset + numBytes;
-
-    int numBytesPut = Math.min(numBytes, buffer1.size());
-
-    target.put(offset, data, 0, numBytesPut);
-
-    offset += numBytesPut;
-
-    while (offset < limit) {
-      target.put(offset++, (byte) 0);
-    }
-  }
-
-  @Override
-  public void writeWithKeyNormalization(
-      WindowedValue<KV<K, V>> record,
-      DataOutputView target) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public WindowedValue<KV<K, V>> readWithKeyDenormalization(
-      WindowedValue<KV<K, V>> reuse,
-      DataInputView source) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean invertNormalizedKey() {
-    return false;
-  }
-
-  @Override
-  public TypeComparator<WindowedValue<KV<K, V>>> duplicate() {
-    return new KvCoderComperator<>(coder);
-  }
-
-  @Override
-  public int extractKeys(Object record, Object[] target, int index) {
-    WindowedValue<KV<K, V>> kv = (WindowedValue<KV<K, V>>) record;
-    K k = kv.getValue().getKey();
-    target[index] = k;
-    return 1;
-  }
-
-  @Override
-  public TypeComparator[] getFlatComparators() {
-    return new TypeComparator[] {new CoderComparator<>(keyCoder)};
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java
deleted file mode 100644
index ba53f64..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.List;
-
-/**
- * Flink {@link TypeInformation} for {@link KvCoder}. This creates special comparator
- * for {@link KV} that always compares on the key only.
- */
-public class KvCoderTypeInformation<K, V> extends CompositeType<WindowedValue<KV<K, V>>> {
-
-  private final WindowedValue.WindowedValueCoder<KV<K, V>> coder;
-//  private KvCoder<K, V> coder;
-
-  // We don't have the Class, so we have to pass null here. What a shame...
-  private static Object dummy = new Object();
-
-  @SuppressWarnings("unchecked")
-  public KvCoderTypeInformation(WindowedValue.WindowedValueCoder<KV<K, V>> coder) {
-    super((Class) dummy.getClass());
-    this.coder = coder;
-    Preconditions.checkNotNull(coder);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public TypeComparator<WindowedValue<KV<K, V>>> createComparator(
-      int[] logicalKeyFields,
-      boolean[] orders,
-      int logicalFieldOffset,
-      ExecutionConfig config) {
-    return new KvCoderComperator(coder);
-  }
-
-  @Override
-  public boolean isBasicType() {
-    return false;
-  }
-
-  @Override
-  public boolean isTupleType() {
-    return false;
-  }
-
-  @Override
-  public int getArity() {
-    return 2;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public Class<WindowedValue<KV<K, V>>> getTypeClass() {
-    return privateGetTypeClass();
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <X> Class<X> privateGetTypeClass() {
-    return (Class<X>) Object.class;
-  }
-
-  @Override
-  public boolean isKeyType() {
-    return true;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public TypeSerializer<WindowedValue<KV<K, V>>> createSerializer(ExecutionConfig config) {
-    return new CoderTypeSerializer<>(coder);
-  }
-
-  @Override
-  public int getTotalFields() {
-    return 2;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    KvCoderTypeInformation that = (KvCoderTypeInformation) o;
-
-    return coder.equals(that.coder);
-
-  }
-
-  @Override
-  public int hashCode() {
-    return coder.hashCode();
-  }
-
-  @Override
-  public String toString() {
-    return "CoderTypeInformation{" +
-        "coder=" + coder +
-        '}';
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <X> TypeInformation<X> getTypeAt(int pos) {
-    KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder();
-    if (pos == 0) {
-      return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getKeyCoder());
-    } else if (pos == 1) {
-      return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getValueCoder());
-    } else {
-      throw new RuntimeException("Invalid field position " + pos);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
-    KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder();
-    switch (fieldExpression) {
-      case "key":
-        return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getKeyCoder());
-      case "value":
-        return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getValueCoder());
-      default:
-        throw new UnsupportedOperationException("Only KvCoder has fields.");
-    }
-  }
-
-  @Override
-  public String[] getFieldNames() {
-    return new String[]{"key", "value"};
-  }
-
-  @Override
-  public int getFieldIndex(String fieldName) {
-    switch (fieldName) {
-      case "key":
-        return 0;
-      case "value":
-        return 1;
-      default:
-        return -1;
-    }
-  }
-
-  @Override
-  public void getFlatFields(
-      String fieldExpression,
-      int offset,
-      List<FlatFieldDescriptor> result) {
-    KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder();
-
-    CoderTypeInformation keyTypeInfo =
-        new CoderTypeInformation<>(kvCoder.getKeyCoder());
-      result.add(new FlatFieldDescriptor(0, keyTypeInfo));
-  }
-
-  @Override
-  protected TypeComparatorBuilder<WindowedValue<KV<K, V>>> createTypeComparatorBuilder() {
-    return new KvCoderTypeComparatorBuilder();
-  }
-
-  private class KvCoderTypeComparatorBuilder
-      implements TypeComparatorBuilder<WindowedValue<KV<K, V>>> {
-
-    @Override
-    public void initializeTypeComparatorBuilder(int size) {}
-
-    @Override
-    public void addComparatorField(int fieldId, TypeComparator<?> comparator) {}
-
-    @Override
-    public TypeComparator<WindowedValue<KV<K, V>>> createTypeComparator(ExecutionConfig config) {
-      return new KvCoderComperator<>(coder);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
new file mode 100644
index 0000000..80d20ca
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+/**
+ * {@link KeySelector} that extracts the key from a {@link KV} and returns
+ * it in encoded form as a {@code byte} array.
+ */
+public class KvKeySelector<InputT, K>
+    implements KeySelector<WindowedValue<KV<K, InputT>>, byte[]>, ResultTypeQueryable<byte[]> {
+
+  private final Coder<K> keyCoder;
+
+  public KvKeySelector(Coder<K> keyCoder) {
+    this.keyCoder = keyCoder;
+  }
+
+  @Override
+  public byte[] getKey(WindowedValue<KV<K, InputT>> value) throws Exception {
+    return CoderUtils.encodeToByteArray(keyCoder, value.getValue().getKey());
+  }
+
+  @Override
+  public TypeInformation<byte[]> getProducedType() {
+    return new EncodedValueTypeInformation();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
new file mode 100644
index 0000000..68ede89
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.translation.types.EncodedValueComparator;
+import org.apache.beam.runners.flink.translation.types.EncodedValueTypeInformation;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.junit.Assert;
+
+/**
+ * Test for {@link EncodedValueComparator}.
+ */
+public class EncodedValueComparatorTest extends ComparatorTestBase<byte[]> {
+
+  @Override
+  protected TypeComparator<byte[]> createComparator(boolean ascending) {
+    return new EncodedValueTypeInformation().createComparator(ascending, new ExecutionConfig());
+  }
+
+  @Override
+  protected TypeSerializer<byte[]> createSerializer() {
+    return new EncodedValueTypeInformation().createSerializer(new ExecutionConfig());
+  }
+
+  @Override
+  protected void deepEquals(String message, byte[] should, byte[] is) {
+    Assert.assertArrayEquals(message, should, is);
+  }
+
+  @Override
+  protected byte[][] getSortedTestData() {
+    StringUtf8Coder coder = StringUtf8Coder.of();
+
+    try {
+      return new byte[][]{
+          CoderUtils.encodeToByteArray(coder, ""),
+          CoderUtils.encodeToByteArray(coder, "Lorem Ipsum Dolor Omit Longer"),
+          CoderUtils.encodeToByteArray(coder, "aaaa"),
+          CoderUtils.encodeToByteArray(coder, "abcd"),
+          CoderUtils.encodeToByteArray(coder, "abce"),
+          CoderUtils.encodeToByteArray(coder, "abdd"),
+          CoderUtils.encodeToByteArray(coder, "accd"),
+          CoderUtils.encodeToByteArray(coder, "bbcd")
+      };
+    } catch (CoderException e) {
+      throw new RuntimeException("Could not encode values.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93ca508b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 299def7..d6e4589 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -19,14 +19,17 @@ package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -40,6 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
@@ -47,6 +51,11 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Assert;
@@ -57,9 +66,16 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Tests for GroupByKey.
@@ -384,4 +400,192 @@ public class GroupByKeyTest {
     assertThat(gbkDisplayData.items(), empty());
     assertThat(fewKeysDisplayData, hasDisplayItem("fewKeys", true));
   }
+
+
+  /**
+   * Verify that runners correctly hash/group on the encoded value
+   * and not the value itself.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testGroupByKeyWithBadEqualsHashCode() throws Exception {
+    final int numValues = 10;
+    final int numKeys = 5;
+
+    Pipeline p = TestPipeline.create();
+
+    p.getCoderRegistry().registerCoder(BadEqualityKey.class, DeterministicKeyCoder.class);
+
+    // construct input data
+    List<KV<BadEqualityKey, Long>> input = new ArrayList<>();
+    for (int i = 0; i < numValues; i++) {
+      for (int key = 0; key < numKeys; key++) {
+        input.add(KV.of(new BadEqualityKey(key), 1L));
+      }
+    }
+
+    // We first ensure that the values are randomly partitioned in the beginning.
+    // Some runners might otherwise keep all values on the machine where
+    // they are initially created.
+    PCollection<KV<BadEqualityKey, Long>> dataset1 = p
+        .apply(Create.of(input))
+        .apply(ParDo.of(new AssignRandomKey()))
+        .apply(Reshuffle.<Long, KV<BadEqualityKey, Long>>of())
+        .apply(Values.<KV<BadEqualityKey, Long>>create());
+
+    // Make the GroupByKey and Count implicit, in real-world code
+    // this would be a Count.perKey()
+    PCollection<KV<BadEqualityKey, Long>> result = dataset1
+        .apply(GroupByKey.<BadEqualityKey, Long>create())
+        .apply(Combine.<BadEqualityKey, Long>groupedValues(new CountFn()));
+
+    PAssert.that(result).satisfies(new AssertThatCountPerKeyCorrect(numValues));
+
+    PAssert.that(result.apply(Keys.<BadEqualityKey>create()))
+        .satisfies(new AssertThatAllKeysExist(numKeys));
+
+    p.run();
+  }
+
+  /**
+   * This is a bogus key class that returns random hash values from {@link #hashCode()} and always
+   * returns {@code false} for {@link #equals(Object)}. The results of the test are correct if
+   * the runner correctly hashes and sorts on the encoded bytes.
+   */
+  static class BadEqualityKey {
+    long key;
+
+    public BadEqualityKey() {}
+
+    public BadEqualityKey(long key) {
+      this.key = key;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return ThreadLocalRandom.current().nextInt();
+    }
+  }
+
+  /**
+   * Deterministic {@link Coder} for {@link BadEqualityKey}.
+   */
+  static class DeterministicKeyCoder extends AtomicCoder<BadEqualityKey> {
+
+    @JsonCreator
+    public static DeterministicKeyCoder of() {
+      return INSTANCE;
+    }
+
+    /////////////////////////////////////////////////////////////////////////////
+
+    private static final DeterministicKeyCoder INSTANCE =
+        new DeterministicKeyCoder();
+
+    private DeterministicKeyCoder() {}
+
+    @Override
+    public void encode(BadEqualityKey value, OutputStream outStream, Context context)
+        throws IOException {
+      new DataOutputStream(outStream).writeLong(value.key);
+    }
+
+    @Override
+    public BadEqualityKey decode(InputStream inStream, Context context)
+        throws IOException {
+      return new BadEqualityKey(new DataInputStream(inStream).readLong());
+    }
+  }
+
+  /**
+   * Creates a KV that wraps the original KV together with a random key.
+   */
+  static class AssignRandomKey
+      extends DoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(KV.of(ThreadLocalRandom.current().nextLong(), c.element()));
+    }
+  }
+
+  static class CountFn implements SerializableFunction<Iterable<Long>, Long> {
+    @Override
+    public Long apply(Iterable<Long> input) {
+      long result = 0L;
+      for (Long in: input) {
+        result += in;
+      }
+      return result;
+    }
+  }
+
+  static class AssertThatCountPerKeyCorrect
+      implements SerializableFunction<Iterable<KV<BadEqualityKey, Long>>, Void> {
+    private final int numValues;
+
+    AssertThatCountPerKeyCorrect(int numValues) {
+      this.numValues = numValues;
+    }
+
+    @Override
+    public Void apply(Iterable<KV<BadEqualityKey, Long>> input) {
+      for (KV<BadEqualityKey, Long> val: input) {
+        Assert.assertEquals(numValues, (long) val.getValue());
+      }
+      return null;
+    }
+  }
+
+  static class AssertThatAllKeysExist
+      implements SerializableFunction<Iterable<BadEqualityKey>, Void> {
+    private final int numKeys;
+
+    AssertThatAllKeysExist(int numKeys) {
+      this.numKeys = numKeys;
+    }
+
+    private static <T> Iterable<Object> asStructural(
+        final Iterable<T> iterable,
+        final Coder<T> coder) {
+
+      return Iterables.transform(
+          iterable,
+          new Function<T, Object>() {
+            @Override
+            public Object apply(T input) {
+              try {
+                return coder.structuralValue(input);
+              } catch (Exception e) {
+                Assert.fail("Could not structural values.");
+                throw new RuntimeException(); // to satisfy the compiler...
+              }
+            }
+          });
+
+    }
+    @Override
+    public Void apply(Iterable<BadEqualityKey> input) {
+      final DeterministicKeyCoder keyCoder = DeterministicKeyCoder.of();
+
+      List<BadEqualityKey> expectedList = new ArrayList<>();
+      for (int key = 0; key < numKeys; key++) {
+        expectedList.add(new BadEqualityKey(key));
+      }
+
+      Iterable<Object> structuralInput = asStructural(input, keyCoder);
+      Iterable<Object> structuralExpected = asStructural(expectedList, keyCoder);
+
+      for (Object expected: structuralExpected) {
+        assertThat(structuralInput, hasItem(expected));
+      }
+
+      return null;
+    }
+  }
 }


[2/2] incubator-beam git commit: This closes #409

Posted by al...@apache.org.
This closes #409


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e4d0a9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e4d0a9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e4d0a9a

Branch: refs/heads/master
Commit: 0e4d0a9ae0250863d7144ffdcc6c4e9cc099611f
Parents: 69a4141 93ca508
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Jun 19 11:35:19 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Jun 19 11:35:19 2016 +0200

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |  10 +
 .../beam/runners/flink/FlinkRunnerResult.java   |   8 +
 .../FlinkBatchTransformTranslators.java         |  49 +---
 .../FlinkBatchTranslationContext.java           |  13 +-
 .../translation/types/CoderComparator.java      | 217 ----------------
 .../translation/types/CoderTypeInformation.java |  14 +-
 .../translation/types/CoderTypeSerializer.java  |  37 +--
 .../types/EncodedValueComparator.java           | 197 ++++++++++++++
 .../types/EncodedValueSerializer.java           | 113 ++++++++
 .../types/EncodedValueTypeInformation.java      | 111 ++++++++
 .../translation/types/KvCoderComperator.java    | 259 -------------------
 .../types/KvCoderTypeInformation.java           | 207 ---------------
 .../flink/translation/types/KvKeySelector.java  |  51 ++++
 .../flink/EncodedValueComparatorTest.java       |  71 +++++
 .../beam/sdk/transforms/GroupByKeyTest.java     | 206 ++++++++++++++-
 15 files changed, 799 insertions(+), 764 deletions(-)
----------------------------------------------------------------------