You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/22 17:25:27 UTC

flink git commit: [FLINK-3868] [core] Specialized CopyableValue serializers and comparators

Repository: flink
Updated Branches:
  refs/heads/master 7ec6d7b55 -> 08b075aa5


[FLINK-3868] [core] Specialized CopyableValue serializers and comparators

Update ValueTypeInfo to use specialized serializers and comparators,
many of which were already present.

This closes #1983


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08b075aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08b075aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08b075aa

Branch: refs/heads/master
Commit: 08b075aa5d5dbc90e909f1bbc3f81712c0604615
Parents: 7ec6d7b
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed May 4 16:56:16 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Jun 22 13:23:21 2016 -0400

----------------------------------------------------------------------
 .../typeutils/base/BooleanValueComparator.java  | 146 ++++++++++++++++++
 .../typeutils/base/ByteValueComparator.java     | 146 ++++++++++++++++++
 .../typeutils/base/CharValueComparator.java     | 146 ++++++++++++++++++
 .../typeutils/base/DoubleValueComparator.java   | 148 +++++++++++++++++++
 .../typeutils/base/FloatValueComparator.java    | 148 +++++++++++++++++++
 .../typeutils/base/IntValueComparator.java      | 146 ++++++++++++++++++
 .../typeutils/base/LongValueComparator.java     | 146 ++++++++++++++++++
 .../typeutils/base/NullValueComparator.java     | 136 +++++++++++++++++
 .../typeutils/base/NullValueSerializer.java     |  82 ++++++++++
 .../typeutils/base/ShortValueComparator.java    | 146 ++++++++++++++++++
 .../typeutils/base/StringValueComparator.java   | 146 ++++++++++++++++++
 .../flink/api/java/typeutils/ValueTypeInfo.java |  86 ++++++++++-
 .../base/BooleanValueComparatorTest.java        |  44 ++++++
 .../typeutils/base/ByteValueComparatorTest.java |  66 +++++++++
 .../typeutils/base/CharValueComparatorTest.java |  62 ++++++++
 .../base/DoubleValueComparatorTest.java         |  63 ++++++++
 .../base/FloatValueComparatorTest.java          |  63 ++++++++
 .../typeutils/base/IntValueComparatorTest.java  |  66 +++++++++
 .../typeutils/base/LongValueComparatorTest.java |  65 ++++++++
 .../base/ShortValueComparatorTest.java          |  65 ++++++++
 .../base/StringValueComparatorTest.java         |  53 +++++++
 21 files changed, 2166 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java
new file mode 100644
index 0000000..0a20d1d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for BooleanValue based on CopyableValueComparator.
+ */
+@Internal
+public class BooleanValueComparator extends TypeComparator<BooleanValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final BooleanValue reference = new BooleanValue();
+
+	private final BooleanValue tempReference = new BooleanValue();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public BooleanValueComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(BooleanValue record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(BooleanValue toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(BooleanValue candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<BooleanValue> referencedComparator) {
+		BooleanValue otherRef = ((BooleanValueComparator) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(BooleanValue first, BooleanValue second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		reference.read(firstSource);
+		tempReference.read(secondSource);
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(BooleanValue.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(BooleanValue record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<BooleanValue> duplicate() {
+		return new BooleanValueComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(BooleanValue record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public BooleanValue readWithKeyDenormalization(BooleanValue reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java
new file mode 100644
index 0000000..a9a067a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for ByteValue based on CopyableValueComparator.
+ */
+@Internal
+public class ByteValueComparator extends TypeComparator<ByteValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final ByteValue reference = new ByteValue();
+
+	private final ByteValue tempReference = new ByteValue();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public ByteValueComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(ByteValue record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(ByteValue toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(ByteValue candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<ByteValue> referencedComparator) {
+		ByteValue otherRef = ((ByteValueComparator) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(ByteValue first, ByteValue second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		reference.read(firstSource);
+		tempReference.read(secondSource);
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(ByteValue.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(ByteValue record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<ByteValue> duplicate() {
+		return new ByteValueComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(ByteValue record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public ByteValue readWithKeyDenormalization(ByteValue reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java
new file mode 100644
index 0000000..892438f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.CharValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for CharValue based on CopyableValueComparator.
+ */
+@Internal
+public class CharValueComparator extends TypeComparator<CharValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final CharValue reference = new CharValue();
+
+	private final CharValue tempReference = new CharValue();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public CharValueComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(CharValue record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(CharValue toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(CharValue candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<CharValue> referencedComparator) {
+		CharValue otherRef = ((CharValueComparator) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(CharValue first, CharValue second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		reference.read(firstSource);
+		tempReference.read(secondSource);
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(CharValue.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(CharValue record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<CharValue> duplicate() {
+		return new CharValueComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(CharValue record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CharValue readWithKeyDenormalization(CharValue reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java
new file mode 100644
index 0000000..5e566f4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for DoubleValue based on CopyableValueComparator.
+ */
+@Internal
+public class DoubleValueComparator extends TypeComparator<DoubleValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final DoubleValue reference = new DoubleValue();
+
+	private final DoubleValue tempReference = new DoubleValue();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public DoubleValueComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(DoubleValue record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(DoubleValue toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(DoubleValue candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<DoubleValue> referencedComparator) {
+		DoubleValue otherRef = ((DoubleValueComparator) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(DoubleValue first, DoubleValue second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		reference.read(firstSource);
+		tempReference.read(secondSource);
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(DoubleValue.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		NormalizableKey<?> key = (NormalizableKey<?>) reference;
+		return key.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(DoubleValue record, MemorySegment target, int offset, int numBytes) {
+		NormalizableKey<?> key = (NormalizableKey<?>) record;
+		key.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<DoubleValue> duplicate() {
+		return new DoubleValueComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(DoubleValue record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public DoubleValue readWithKeyDenormalization(DoubleValue reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java
new file mode 100644
index 0000000..90077c6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for FloatValue based on CopyableValueComparator.
+ */
+@Internal
+public class FloatValueComparator extends TypeComparator<FloatValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final FloatValue reference = new FloatValue();
+
+	private final FloatValue tempReference = new FloatValue();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public FloatValueComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(FloatValue record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(FloatValue toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(FloatValue candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<FloatValue> referencedComparator) {
+		FloatValue otherRef = ((FloatValueComparator) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(FloatValue first, FloatValue second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		reference.read(firstSource);
+		tempReference.read(secondSource);
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(FloatValue.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		NormalizableKey<?> key = (NormalizableKey<?>) reference;
+		return key.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(FloatValue record, MemorySegment target, int offset, int numBytes) {
+		NormalizableKey<?> key = (NormalizableKey<?>) record;
+		key.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<FloatValue> duplicate() {
+		return new FloatValueComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(FloatValue record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public FloatValue readWithKeyDenormalization(FloatValue reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java
new file mode 100644
index 0000000..e18e7dc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for IntValue based on CopyableValueComparator.
+ */
+@Internal
+public class IntValueComparator extends TypeComparator<IntValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final IntValue reference = new IntValue();
+
+	private final IntValue tempReference = new IntValue();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public IntValueComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(IntValue record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(IntValue toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(IntValue candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<IntValue> referencedComparator) {
+		IntValue otherRef = ((IntValueComparator) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(IntValue first, IntValue second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		reference.read(firstSource);
+		tempReference.read(secondSource);
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(IntValue.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(IntValue record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<IntValue> duplicate() {
+		return new IntValueComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(IntValue record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public IntValue readWithKeyDenormalization(IntValue reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java
new file mode 100644
index 0000000..dfe1094
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for LongValue based on CopyableValueComparator.
+ */
+@Internal
+public class LongValueComparator extends TypeComparator<LongValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final LongValue reference = new LongValue();
+
+	private final LongValue tempReference = new LongValue();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public LongValueComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(LongValue record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(LongValue toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(LongValue candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<LongValue> referencedComparator) {
+		LongValue otherRef = ((LongValueComparator) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(LongValue first, LongValue second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		reference.read(firstSource);
+		tempReference.read(secondSource);
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(LongValue.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(LongValue record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<LongValue> duplicate() {
+		return new LongValueComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(LongValue record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public LongValue readWithKeyDenormalization(LongValue reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java
new file mode 100644
index 0000000..7212cc7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.NullValue;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for NullValue based on CopyableValueComparator.
+ */
+@Internal
+public class NullValueComparator extends TypeComparator<NullValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	private final static NullValueComparator INSTANCE = new NullValueComparator();
+
+	public static NullValueComparator getInstance() {
+		return INSTANCE;
+	}
+
+	private NullValueComparator() {}
+
+	@Override
+	public int hash(NullValue record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(NullValue toCompare) {}
+
+	@Override
+	public boolean equalToReference(NullValue candidate) {
+		return true;
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<NullValue> referencedComparator) {
+		return 0;
+	}
+
+	@Override
+	public int compare(NullValue first, NullValue second) {
+		return 0;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		return 0;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(NullValue.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return NullValue.getInstance().getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(NullValue record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return false;
+	}
+
+	@Override
+	public TypeComparator<NullValue> duplicate() {
+		return NullValueComparator.getInstance();
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(NullValue record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public NullValue readWithKeyDenormalization(NullValue reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
new file mode 100644
index 0000000..3303d57
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.NullValue;
+
+import java.io.IOException;
+
+@Internal
+public final class NullValueSerializer extends TypeSerializerSingleton<NullValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final NullValueSerializer INSTANCE = new NullValueSerializer();
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public NullValue createInstance() {
+		return NullValue.getInstance();
+	}
+
+	@Override
+	public NullValue copy(NullValue from) {
+		return NullValue.getInstance();
+	}
+
+	@Override
+	public NullValue copy(NullValue from, NullValue reuse) {
+		return NullValue.getInstance();
+	}
+
+	@Override
+	public int getLength() {
+		return 0;
+	}
+
+	@Override
+	public void serialize(NullValue record, DataOutputView target) throws IOException {
+	}
+
+	@Override
+	public NullValue deserialize(DataInputView source) throws IOException {
+		return NullValue.getInstance();
+	}
+
+	@Override
+	public NullValue deserialize(NullValue reuse, DataInputView source) throws IOException {
+		return NullValue.getInstance();
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof NullValueSerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java
new file mode 100644
index 0000000..a776b70
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.ShortValue;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for ShortValue based on CopyableValueComparator.
+ */
+@Internal
+public class ShortValueComparator extends TypeComparator<ShortValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final ShortValue reference = new ShortValue();
+
+	private final ShortValue tempReference = new ShortValue();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public ShortValueComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(ShortValue record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(ShortValue toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(ShortValue candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<ShortValue> referencedComparator) {
+		ShortValue otherRef = ((ShortValueComparator) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(ShortValue first, ShortValue second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		reference.read(firstSource);
+		tempReference.read(secondSource);
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(ShortValue.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(ShortValue record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<ShortValue> duplicate() {
+		return new ShortValueComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(ShortValue record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public ShortValue readWithKeyDenormalization(ShortValue reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java
new file mode 100644
index 0000000..c264901
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.StringValue;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for StringValue based on CopyableValueComparator.
+ */
+@Internal
+public class StringValueComparator extends TypeComparator<StringValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final StringValue reference = new StringValue();
+
+	private final StringValue tempReference = new StringValue();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public StringValueComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(StringValue record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(StringValue toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(StringValue candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<StringValue> referencedComparator) {
+		StringValue otherRef = ((StringValueComparator) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(StringValue first, StringValue second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		reference.read(firstSource);
+		tempReference.read(secondSource);
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(StringValue.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(StringValue record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<StringValue> duplicate() {
+		return new StringValueComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(StringValue record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public StringValue readWithKeyDenormalization(StringValue reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 495a324..5a8334f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -26,6 +26,26 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanValueComparator;
+import org.apache.flink.api.common.typeutils.base.BooleanValueSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteValueComparator;
+import org.apache.flink.api.common.typeutils.base.ByteValueSerializer;
+import org.apache.flink.api.common.typeutils.base.CharValueComparator;
+import org.apache.flink.api.common.typeutils.base.CharValueSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleValueComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleValueSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatValueComparator;
+import org.apache.flink.api.common.typeutils.base.FloatValueSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueComparator;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
+import org.apache.flink.api.common.typeutils.base.LongValueComparator;
+import org.apache.flink.api.common.typeutils.base.LongValueSerializer;
+import org.apache.flink.api.common.typeutils.base.NullValueComparator;
+import org.apache.flink.api.common.typeutils.base.NullValueSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortValueComparator;
+import org.apache.flink.api.common.typeutils.base.ShortValueSerializer;
+import org.apache.flink.api.common.typeutils.base.StringValueComparator;
+import org.apache.flink.api.common.typeutils.base.StringValueSerializer;
 import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
 import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
 import org.apache.flink.api.java.typeutils.runtime.ValueComparator;
@@ -126,7 +146,37 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
 	@SuppressWarnings("unchecked")
 	@PublicEvolving
 	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
-		if (CopyableValue.class.isAssignableFrom(type)) {
+		if (BooleanValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<T>) BooleanValueSerializer.INSTANCE;
+		}
+		else if (ByteValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<T>) ByteValueSerializer.INSTANCE;
+		}
+		else if (CharValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<T>) CharValueSerializer.INSTANCE;
+		}
+		else if (DoubleValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<T>) DoubleValueSerializer.INSTANCE;
+		}
+		else if (FloatValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<T>) FloatValueSerializer.INSTANCE;
+		}
+		else if (IntValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<T>) IntValueSerializer.INSTANCE;
+		}
+		else if (LongValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<T>) LongValueSerializer.INSTANCE;
+		}
+		else if (NullValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<T>) NullValueSerializer.INSTANCE;
+		}
+		else if (ShortValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<T>) ShortValueSerializer.INSTANCE;
+		}
+		else if (StringValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<T>) StringValueSerializer.INSTANCE;
+		}
+		else if (CopyableValue.class.isAssignableFrom(type)) {
 			return (TypeSerializer<T>) createCopyableValueSerializer(type.asSubclass(CopyableValue.class));
 		}
 		else {
@@ -141,8 +191,38 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
 		if (!isKeyType()) {
 			throw new RuntimeException("The type " + type.getName() + " is not Comparable.");
 		}
-		
-		if (CopyableValue.class.isAssignableFrom(type)) {
+
+		if (BooleanValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<T>) new BooleanValueComparator(sortOrderAscending);
+		}
+		else if (ByteValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<T>) new ByteValueComparator(sortOrderAscending);
+		}
+		else if (CharValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<T>) new CharValueComparator(sortOrderAscending);
+		}
+		else if (DoubleValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<T>) new DoubleValueComparator(sortOrderAscending);
+		}
+		else if (FloatValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<T>) new FloatValueComparator(sortOrderAscending);
+		}
+		else if (IntValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<T>) new IntValueComparator(sortOrderAscending);
+		}
+		else if (LongValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<T>) new LongValueComparator(sortOrderAscending);
+		}
+		else if (NullValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<T>) NullValueComparator.getInstance();
+		}
+		else if (ShortValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<T>) new ShortValueComparator(sortOrderAscending);
+		}
+		else if (StringValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<T>) new StringValueComparator(sortOrderAscending);
+		}
+		else if (CopyableValue.class.isAssignableFrom(type)) {
 			return (TypeComparator<T>) new CopyableValueComparator(sortOrderAscending, type);
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java
new file mode 100644
index 0000000..62ca95d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.BooleanValue;
+
+public class BooleanValueComparatorTest extends ComparatorTestBase<BooleanValue> {
+
+	@Override
+	protected TypeComparator<BooleanValue> createComparator(boolean ascending) {
+		return new BooleanValueComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<BooleanValue> createSerializer() {
+		return new BooleanValueSerializer();
+	}
+
+	@Override
+	protected BooleanValue[] getSortedTestData() {
+		return new BooleanValue[]{BooleanValue.FALSE, BooleanValue.TRUE};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java
new file mode 100644
index 0000000..f14e3d2
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java
@@ -0,0 +1,66 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ByteValue;
+
+import java.util.Random;
+
+public class ByteValueComparatorTest extends ComparatorTestBase<ByteValue> {
+
+	@Override
+	protected TypeComparator<ByteValue> createComparator(boolean ascending) {
+		return new ByteValueComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<ByteValue> createSerializer() {
+		return new ByteValueSerializer();
+	}
+
+	@Override
+	protected ByteValue[] getSortedTestData() {
+
+		Random rnd = new Random(874597969123412338L);
+		int rndByte = rnd.nextInt(Byte.MAX_VALUE);
+		if (rndByte < 0) {
+			rndByte = -rndByte;
+		}
+		if (rndByte == Byte.MAX_VALUE) {
+			rndByte -= 3;
+		}
+		if (rndByte <= 2) {
+			rndByte += 3;
+		}
+		return new ByteValue[]{
+			new ByteValue(Byte.MIN_VALUE),
+			new ByteValue(Integer.valueOf(-rndByte).byteValue()),
+			new ByteValue(Integer.valueOf(-1).byteValue()),
+			new ByteValue(Integer.valueOf(0).byteValue()),
+			new ByteValue(Integer.valueOf(1).byteValue()),
+			new ByteValue(Integer.valueOf(2).byteValue()),
+			new ByteValue(Integer.valueOf(rndByte).byteValue()),
+			new ByteValue(Byte.MAX_VALUE)};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java
new file mode 100644
index 0000000..f952fc4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java
@@ -0,0 +1,62 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.CharValue;
+
+import java.util.Random;
+
+public class CharValueComparatorTest extends ComparatorTestBase<CharValue> {
+
+	@Override
+	protected TypeComparator<CharValue> createComparator(boolean ascending) {
+		return new CharValueComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<CharValue> createSerializer() {
+		return new CharValueSerializer();
+	}
+
+	@Override
+	protected CharValue[] getSortedTestData() {
+		Random rnd = new Random(874597969123412338L);
+		int rndChar = rnd.nextInt(Character.MAX_VALUE);
+		if(rndChar<0){
+			rndChar=-rndChar;
+		}
+		if(rndChar==(int)Character.MIN_VALUE){
+			rndChar+=2;
+		}
+		if(rndChar==(int)Character.MAX_VALUE){
+			rndChar-=2;
+		}
+		return new CharValue[]{
+			new CharValue(Character.MIN_VALUE),
+			new CharValue((char)rndChar),
+			new CharValue((char)(rndChar+1)),
+			new CharValue(Character.MAX_VALUE)
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java
new file mode 100644
index 0000000..b50ceca
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.DoubleValue;
+
+import java.util.Random;
+
+public class DoubleValueComparatorTest extends ComparatorTestBase<DoubleValue> {
+
+	@Override
+	protected TypeComparator<DoubleValue> createComparator(boolean ascending) {
+		return new DoubleValueComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<DoubleValue> createSerializer() {
+		return new DoubleValueSerializer();
+	}
+	
+	@Override
+	protected DoubleValue[] getSortedTestData() {
+		Random rnd = new Random(874597969123412338L);
+		double rndDouble = rnd.nextDouble();
+		if (rndDouble < 0) {
+			rndDouble = -rndDouble;
+		}
+		if (rndDouble == Double.MAX_VALUE) {
+			rndDouble -= 3;
+		}
+		if (rndDouble <= 2) {
+			rndDouble += 3;
+		}
+		return new DoubleValue[]{
+			new DoubleValue(-rndDouble),
+			new DoubleValue(-1.0D),
+			new DoubleValue(0.0D),
+			new DoubleValue(2.0D),
+			new DoubleValue(rndDouble),
+			new DoubleValue(Double.MAX_VALUE)};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java
new file mode 100644
index 0000000..67c5ba9
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.FloatValue;
+
+import java.util.Random;
+
+public class FloatValueComparatorTest extends ComparatorTestBase<FloatValue> {
+
+	@Override
+	protected TypeComparator<FloatValue> createComparator(boolean ascending) {
+		return new FloatValueComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<FloatValue> createSerializer() {
+		return new FloatValueSerializer();
+	}
+
+	@Override
+	protected FloatValue[] getSortedTestData() {
+		Random rnd = new Random(874597969123412338L);
+		float rndFloat = rnd.nextFloat();
+		if (rndFloat < 0) {
+			rndFloat = -rndFloat;
+		}
+		if (rndFloat == Float.MAX_VALUE) {
+			rndFloat -= 3;
+		}
+		if (rndFloat <= 2) {
+			rndFloat += 3;
+		}
+		return new FloatValue[]{
+			new FloatValue(-rndFloat),
+			new FloatValue(-1.0F),
+			new FloatValue(0.0F),
+			new FloatValue(2.0F),
+			new FloatValue(rndFloat),
+			new FloatValue(Float.MAX_VALUE)};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java
new file mode 100644
index 0000000..037eb56
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java
@@ -0,0 +1,66 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.IntValue;
+
+import java.util.Random;
+
+public class IntValueComparatorTest extends ComparatorTestBase<IntValue> {
+
+	@Override
+	protected TypeComparator<IntValue> createComparator(boolean ascending) {
+		return new IntValueComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<IntValue> createSerializer() {
+		return new IntValueSerializer();
+	}
+
+	@Override
+	protected IntValue[] getSortedTestData() {
+
+		Random rnd = new Random(874597969123412338L);
+		int rndInt = rnd.nextInt();
+		if (rndInt < 0) {
+			rndInt = -rndInt;
+		}
+		if (rndInt == Integer.MAX_VALUE) {
+			rndInt -= 3;
+		}
+		if (rndInt <= 2) {
+			rndInt += 3;
+		}
+		return new IntValue[]{
+			new IntValue(Integer.MIN_VALUE),
+			new IntValue(-rndInt),
+			new IntValue(-1),
+			new IntValue(0),
+			new IntValue(1),
+			new IntValue(2),
+			new IntValue(rndInt),
+			new IntValue(Integer.MAX_VALUE)};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java
new file mode 100644
index 0000000..6871a6b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.LongValue;
+
+import java.util.Random;
+
+public class LongValueComparatorTest extends ComparatorTestBase<LongValue> {
+
+	@Override
+	protected TypeComparator<LongValue> createComparator(boolean ascending) {
+		return new LongValueComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<LongValue> createSerializer() {
+		return new LongValueSerializer();
+	}
+
+	@Override
+	protected LongValue[] getSortedTestData() {
+		Random rnd = new Random(874597969123412338L);
+		long rndLong = rnd.nextLong();
+		if (rndLong < 0) {
+			rndLong = -rndLong;
+		}
+		if (rndLong == Long.MAX_VALUE) {
+			rndLong -= 3;
+		}
+		if (rndLong <= 2) {
+			rndLong += 3;
+		}
+		return new LongValue[]{
+			new LongValue(Long.MIN_VALUE),
+			new LongValue(-rndLong),
+			new LongValue(-1L),
+			new LongValue(0L),
+			new LongValue(1L),
+			new LongValue(2L),
+			new LongValue(rndLong),
+			new LongValue(Long.MAX_VALUE)};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java
new file mode 100644
index 0000000..7d4a3a0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ShortValue;
+
+import java.util.Random;
+
+public class ShortValueComparatorTest extends ComparatorTestBase<ShortValue> {
+
+	@Override
+	protected TypeComparator<ShortValue> createComparator(boolean ascending) {
+		return new ShortValueComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<ShortValue> createSerializer() {
+		return new ShortValueSerializer();
+	}
+
+	@Override
+	protected ShortValue[] getSortedTestData() {
+		Random rnd = new Random(874597969123412338L);
+		short rndShort = Integer.valueOf(rnd.nextInt()).shortValue();
+		if (rndShort < 0) {
+			rndShort = Integer.valueOf(-rndShort).shortValue();
+		}
+		if (rndShort == Short.MAX_VALUE) {
+			rndShort -= 3;
+		}
+		if (rndShort <= 2) {
+			rndShort += 3;
+		}
+		return new ShortValue[]{
+			new ShortValue(Short.MIN_VALUE),
+			new ShortValue(Integer.valueOf(-rndShort).shortValue()),
+			new ShortValue(Integer.valueOf(-1).shortValue()),
+			new ShortValue(Integer.valueOf(0).shortValue()),
+			new ShortValue(Integer.valueOf(1).shortValue()),
+			new ShortValue(Integer.valueOf(2).shortValue()),
+			new ShortValue(Integer.valueOf(rndShort).shortValue()),
+			new ShortValue(Short.MAX_VALUE)};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java
new file mode 100644
index 0000000..e0b2f70
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java
@@ -0,0 +1,53 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.StringValue;
+
+public class StringValueComparatorTest extends ComparatorTestBase<StringValue> {
+
+	@Override
+	protected TypeComparator<StringValue> createComparator(boolean ascending) {
+		return new StringValueComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<StringValue> createSerializer() {
+		return new StringValueSerializer();
+	}
+
+	@Override
+	protected StringValue[] getSortedTestData() {
+		return new StringValue[]{
+			new StringValue(""),
+			new StringValue("Lorem Ipsum Dolor Omit Longer"),
+			new StringValue("aaaa"),
+			new StringValue("abcd"),
+			new StringValue("abce"),
+			new StringValue("abdd"),
+			new StringValue("accd"),
+			new StringValue("bbcd")
+		};
+	}
+}