You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/22 17:25:27 UTC
flink git commit: [FLINK-3868] [core] Specialized CopyableValue
serializers and comparators
Repository: flink
Updated Branches:
refs/heads/master 7ec6d7b55 -> 08b075aa5
[FLINK-3868] [core] Specialized CopyableValue serializers and comparators
Update ValueTypeInfo to use specialized serializers and comparators,
many of which were already present.
This closes #1983
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08b075aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08b075aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08b075aa
Branch: refs/heads/master
Commit: 08b075aa5d5dbc90e909f1bbc3f81712c0604615
Parents: 7ec6d7b
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed May 4 16:56:16 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Jun 22 13:23:21 2016 -0400
----------------------------------------------------------------------
.../typeutils/base/BooleanValueComparator.java | 146 ++++++++++++++++++
.../typeutils/base/ByteValueComparator.java | 146 ++++++++++++++++++
.../typeutils/base/CharValueComparator.java | 146 ++++++++++++++++++
.../typeutils/base/DoubleValueComparator.java | 148 +++++++++++++++++++
.../typeutils/base/FloatValueComparator.java | 148 +++++++++++++++++++
.../typeutils/base/IntValueComparator.java | 146 ++++++++++++++++++
.../typeutils/base/LongValueComparator.java | 146 ++++++++++++++++++
.../typeutils/base/NullValueComparator.java | 136 +++++++++++++++++
.../typeutils/base/NullValueSerializer.java | 82 ++++++++++
.../typeutils/base/ShortValueComparator.java | 146 ++++++++++++++++++
.../typeutils/base/StringValueComparator.java | 146 ++++++++++++++++++
.../flink/api/java/typeutils/ValueTypeInfo.java | 86 ++++++++++-
.../base/BooleanValueComparatorTest.java | 44 ++++++
.../typeutils/base/ByteValueComparatorTest.java | 66 +++++++++
.../typeutils/base/CharValueComparatorTest.java | 62 ++++++++
.../base/DoubleValueComparatorTest.java | 63 ++++++++
.../base/FloatValueComparatorTest.java | 63 ++++++++
.../typeutils/base/IntValueComparatorTest.java | 66 +++++++++
.../typeutils/base/LongValueComparatorTest.java | 65 ++++++++
.../base/ShortValueComparatorTest.java | 65 ++++++++
.../base/StringValueComparatorTest.java | 53 +++++++
21 files changed, 2166 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java
new file mode 100644
index 0000000..0a20d1d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for BooleanValue based on CopyableValueComparator.
+ */
+@Internal
+public class BooleanValueComparator extends TypeComparator<BooleanValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final BooleanValue reference = new BooleanValue();
+
+ private final BooleanValue tempReference = new BooleanValue();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public BooleanValueComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(BooleanValue record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(BooleanValue toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(BooleanValue candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<BooleanValue> referencedComparator) {
+ BooleanValue otherRef = ((BooleanValueComparator) referencedComparator).reference;
+ int comp = otherRef.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(BooleanValue first, BooleanValue second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ reference.read(firstSource);
+ tempReference.read(secondSource);
+ int comp = reference.compareTo(tempReference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(BooleanValue.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(BooleanValue record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<BooleanValue> duplicate() {
+ return new BooleanValueComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(BooleanValue record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BooleanValue readWithKeyDenormalization(BooleanValue reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java
new file mode 100644
index 0000000..a9a067a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for ByteValue based on CopyableValueComparator.
+ */
+@Internal
+public class ByteValueComparator extends TypeComparator<ByteValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final ByteValue reference = new ByteValue();
+
+ private final ByteValue tempReference = new ByteValue();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public ByteValueComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(ByteValue record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(ByteValue toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(ByteValue candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<ByteValue> referencedComparator) {
+ ByteValue otherRef = ((ByteValueComparator) referencedComparator).reference;
+ int comp = otherRef.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(ByteValue first, ByteValue second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ reference.read(firstSource);
+ tempReference.read(secondSource);
+ int comp = reference.compareTo(tempReference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(ByteValue.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(ByteValue record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<ByteValue> duplicate() {
+ return new ByteValueComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(ByteValue record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteValue readWithKeyDenormalization(ByteValue reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java
new file mode 100644
index 0000000..892438f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.CharValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for CharValue based on CopyableValueComparator.
+ */
+@Internal
+public class CharValueComparator extends TypeComparator<CharValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final CharValue reference = new CharValue();
+
+ private final CharValue tempReference = new CharValue();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public CharValueComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(CharValue record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(CharValue toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(CharValue candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<CharValue> referencedComparator) {
+ CharValue otherRef = ((CharValueComparator) referencedComparator).reference;
+ int comp = otherRef.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(CharValue first, CharValue second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ reference.read(firstSource);
+ tempReference.read(secondSource);
+ int comp = reference.compareTo(tempReference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(CharValue.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(CharValue record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<CharValue> duplicate() {
+ return new CharValueComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(CharValue record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CharValue readWithKeyDenormalization(CharValue reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java
new file mode 100644
index 0000000..5e566f4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for DoubleValue based on CopyableValueComparator.
+ */
+@Internal
+public class DoubleValueComparator extends TypeComparator<DoubleValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final DoubleValue reference = new DoubleValue();
+
+ private final DoubleValue tempReference = new DoubleValue();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public DoubleValueComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(DoubleValue record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(DoubleValue toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(DoubleValue candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<DoubleValue> referencedComparator) {
+ DoubleValue otherRef = ((DoubleValueComparator) referencedComparator).reference;
+ int comp = otherRef.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(DoubleValue first, DoubleValue second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ reference.read(firstSource);
+ tempReference.read(secondSource);
+ int comp = reference.compareTo(tempReference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(DoubleValue.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ NormalizableKey<?> key = (NormalizableKey<?>) reference;
+ return key.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(DoubleValue record, MemorySegment target, int offset, int numBytes) {
+ NormalizableKey<?> key = (NormalizableKey<?>) record;
+ key.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<DoubleValue> duplicate() {
+ return new DoubleValueComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(DoubleValue record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DoubleValue readWithKeyDenormalization(DoubleValue reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java
new file mode 100644
index 0000000..90077c6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for FloatValue based on CopyableValueComparator.
+ */
+@Internal
+public class FloatValueComparator extends TypeComparator<FloatValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final FloatValue reference = new FloatValue();
+
+ private final FloatValue tempReference = new FloatValue();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public FloatValueComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(FloatValue record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(FloatValue toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(FloatValue candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<FloatValue> referencedComparator) {
+ FloatValue otherRef = ((FloatValueComparator) referencedComparator).reference;
+ int comp = otherRef.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(FloatValue first, FloatValue second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ reference.read(firstSource);
+ tempReference.read(secondSource);
+ int comp = reference.compareTo(tempReference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(FloatValue.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ NormalizableKey<?> key = (NormalizableKey<?>) reference;
+ return key.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(FloatValue record, MemorySegment target, int offset, int numBytes) {
+ NormalizableKey<?> key = (NormalizableKey<?>) record;
+ key.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<FloatValue> duplicate() {
+ return new FloatValueComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(FloatValue record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FloatValue readWithKeyDenormalization(FloatValue reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java
new file mode 100644
index 0000000..e18e7dc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for IntValue based on CopyableValueComparator.
+ */
+@Internal
+public class IntValueComparator extends TypeComparator<IntValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final IntValue reference = new IntValue();
+
+ private final IntValue tempReference = new IntValue();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public IntValueComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(IntValue record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(IntValue toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(IntValue candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<IntValue> referencedComparator) {
+ IntValue otherRef = ((IntValueComparator) referencedComparator).reference;
+ int comp = otherRef.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(IntValue first, IntValue second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ reference.read(firstSource);
+ tempReference.read(secondSource);
+ int comp = reference.compareTo(tempReference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(IntValue.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(IntValue record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<IntValue> duplicate() {
+ return new IntValueComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(IntValue record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IntValue readWithKeyDenormalization(IntValue reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java
new file mode 100644
index 0000000..dfe1094
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for LongValue based on CopyableValueComparator.
+ */
+@Internal
+public class LongValueComparator extends TypeComparator<LongValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final LongValue reference = new LongValue();
+
+ private final LongValue tempReference = new LongValue();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public LongValueComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(LongValue record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(LongValue toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(LongValue candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<LongValue> referencedComparator) {
+ LongValue otherRef = ((LongValueComparator) referencedComparator).reference;
+ int comp = otherRef.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(LongValue first, LongValue second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ reference.read(firstSource);
+ tempReference.read(secondSource);
+ int comp = reference.compareTo(tempReference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(LongValue.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(LongValue record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<LongValue> duplicate() {
+ return new LongValueComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(LongValue record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LongValue readWithKeyDenormalization(LongValue reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java
new file mode 100644
index 0000000..7212cc7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.NullValue;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for NullValue based on CopyableValueComparator.
+ */
+@Internal
+public class NullValueComparator extends TypeComparator<NullValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ private final static NullValueComparator INSTANCE = new NullValueComparator();
+
+ public static NullValueComparator getInstance() {
+ return INSTANCE;
+ }
+
+ private NullValueComparator() {}
+
+ @Override
+ public int hash(NullValue record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(NullValue toCompare) {}
+
+ @Override
+ public boolean equalToReference(NullValue candidate) {
+ return true;
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<NullValue> referencedComparator) {
+ return 0;
+ }
+
+ @Override
+ public int compare(NullValue first, NullValue second) {
+ return 0;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(NullValue.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return NullValue.getInstance().getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(NullValue record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return false;
+ }
+
+ @Override
+ public TypeComparator<NullValue> duplicate() {
+ return NullValueComparator.getInstance();
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(NullValue record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public NullValue readWithKeyDenormalization(NullValue reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
new file mode 100644
index 0000000..3303d57
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.NullValue;
+
+import java.io.IOException;
+
+@Internal
+public final class NullValueSerializer extends TypeSerializerSingleton<NullValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final NullValueSerializer INSTANCE = new NullValueSerializer();
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public NullValue createInstance() {
+ return NullValue.getInstance();
+ }
+
+ @Override
+ public NullValue copy(NullValue from) {
+ return NullValue.getInstance();
+ }
+
+ @Override
+ public NullValue copy(NullValue from, NullValue reuse) {
+ return NullValue.getInstance();
+ }
+
+ @Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(NullValue record, DataOutputView target) throws IOException {
+ }
+
+ @Override
+ public NullValue deserialize(DataInputView source) throws IOException {
+ return NullValue.getInstance();
+ }
+
+ @Override
+ public NullValue deserialize(NullValue reuse, DataInputView source) throws IOException {
+ return NullValue.getInstance();
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof NullValueSerializer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java
new file mode 100644
index 0000000..a776b70
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.ShortValue;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for ShortValue based on CopyableValueComparator.
+ */
+@Internal
+public class ShortValueComparator extends TypeComparator<ShortValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final ShortValue reference = new ShortValue();
+
+ private final ShortValue tempReference = new ShortValue();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public ShortValueComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(ShortValue record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(ShortValue toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(ShortValue candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<ShortValue> referencedComparator) {
+ ShortValue otherRef = ((ShortValueComparator) referencedComparator).reference;
+ int comp = otherRef.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(ShortValue first, ShortValue second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ reference.read(firstSource);
+ tempReference.read(secondSource);
+ int comp = reference.compareTo(tempReference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(ShortValue.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(ShortValue record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<ShortValue> duplicate() {
+ return new ShortValueComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(ShortValue record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ShortValue readWithKeyDenormalization(ShortValue reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java
new file mode 100644
index 0000000..c264901
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.StringValue;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for StringValue based on CopyableValueComparator.
+ */
+@Internal
+public class StringValueComparator extends TypeComparator<StringValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final StringValue reference = new StringValue();
+
+ private final StringValue tempReference = new StringValue();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public StringValueComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(StringValue record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(StringValue toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(StringValue candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<StringValue> referencedComparator) {
+ StringValue otherRef = ((StringValueComparator) referencedComparator).reference;
+ int comp = otherRef.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(StringValue first, StringValue second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ reference.read(firstSource);
+ tempReference.read(secondSource);
+ int comp = reference.compareTo(tempReference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(StringValue.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(StringValue record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<StringValue> duplicate() {
+ return new StringValueComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(StringValue record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StringValue readWithKeyDenormalization(StringValue reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 495a324..5a8334f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -26,6 +26,26 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanValueComparator;
+import org.apache.flink.api.common.typeutils.base.BooleanValueSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteValueComparator;
+import org.apache.flink.api.common.typeutils.base.ByteValueSerializer;
+import org.apache.flink.api.common.typeutils.base.CharValueComparator;
+import org.apache.flink.api.common.typeutils.base.CharValueSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleValueComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleValueSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatValueComparator;
+import org.apache.flink.api.common.typeutils.base.FloatValueSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueComparator;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
+import org.apache.flink.api.common.typeutils.base.LongValueComparator;
+import org.apache.flink.api.common.typeutils.base.LongValueSerializer;
+import org.apache.flink.api.common.typeutils.base.NullValueComparator;
+import org.apache.flink.api.common.typeutils.base.NullValueSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortValueComparator;
+import org.apache.flink.api.common.typeutils.base.ShortValueSerializer;
+import org.apache.flink.api.common.typeutils.base.StringValueComparator;
+import org.apache.flink.api.common.typeutils.base.StringValueSerializer;
import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
import org.apache.flink.api.java.typeutils.runtime.ValueComparator;
@@ -126,7 +146,37 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
@SuppressWarnings("unchecked")
@PublicEvolving
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
- if (CopyableValue.class.isAssignableFrom(type)) {
+ if (BooleanValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<T>) BooleanValueSerializer.INSTANCE;
+ }
+ else if (ByteValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<T>) ByteValueSerializer.INSTANCE;
+ }
+ else if (CharValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<T>) CharValueSerializer.INSTANCE;
+ }
+ else if (DoubleValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<T>) DoubleValueSerializer.INSTANCE;
+ }
+ else if (FloatValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<T>) FloatValueSerializer.INSTANCE;
+ }
+ else if (IntValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<T>) IntValueSerializer.INSTANCE;
+ }
+ else if (LongValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<T>) LongValueSerializer.INSTANCE;
+ }
+ else if (NullValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<T>) NullValueSerializer.INSTANCE;
+ }
+ else if (ShortValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<T>) ShortValueSerializer.INSTANCE;
+ }
+ else if (StringValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<T>) StringValueSerializer.INSTANCE;
+ }
+ else if (CopyableValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) createCopyableValueSerializer(type.asSubclass(CopyableValue.class));
}
else {
@@ -141,8 +191,38 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
if (!isKeyType()) {
throw new RuntimeException("The type " + type.getName() + " is not Comparable.");
}
-
- if (CopyableValue.class.isAssignableFrom(type)) {
+
+ if (BooleanValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<T>) new BooleanValueComparator(sortOrderAscending);
+ }
+ else if (ByteValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<T>) new ByteValueComparator(sortOrderAscending);
+ }
+ else if (CharValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<T>) new CharValueComparator(sortOrderAscending);
+ }
+ else if (DoubleValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<T>) new DoubleValueComparator(sortOrderAscending);
+ }
+ else if (FloatValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<T>) new FloatValueComparator(sortOrderAscending);
+ }
+ else if (IntValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<T>) new IntValueComparator(sortOrderAscending);
+ }
+ else if (LongValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<T>) new LongValueComparator(sortOrderAscending);
+ }
+ else if (NullValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<T>) NullValueComparator.getInstance();
+ }
+ else if (ShortValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<T>) new ShortValueComparator(sortOrderAscending);
+ }
+ else if (StringValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<T>) new StringValueComparator(sortOrderAscending);
+ }
+ else if (CopyableValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new CopyableValueComparator(sortOrderAscending, type);
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java
new file mode 100644
index 0000000..62ca95d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.BooleanValue;
+
+public class BooleanValueComparatorTest extends ComparatorTestBase<BooleanValue> {
+
+ @Override
+ protected TypeComparator<BooleanValue> createComparator(boolean ascending) {
+ return new BooleanValueComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<BooleanValue> createSerializer() {
+ return new BooleanValueSerializer();
+ }
+
+ @Override
+ protected BooleanValue[] getSortedTestData() {
+ return new BooleanValue[]{BooleanValue.FALSE, BooleanValue.TRUE};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java
new file mode 100644
index 0000000..f14e3d2
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ByteValue;
+
+import java.util.Random;
+
+public class ByteValueComparatorTest extends ComparatorTestBase<ByteValue> {
+
+ @Override
+ protected TypeComparator<ByteValue> createComparator(boolean ascending) {
+ return new ByteValueComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<ByteValue> createSerializer() {
+ return new ByteValueSerializer();
+ }
+
+ @Override
+ protected ByteValue[] getSortedTestData() {
+
+ Random rnd = new Random(874597969123412338L);
+ int rndByte = rnd.nextInt(Byte.MAX_VALUE);
+ if (rndByte < 0) {
+ rndByte = -rndByte;
+ }
+ if (rndByte == Byte.MAX_VALUE) {
+ rndByte -= 3;
+ }
+ if (rndByte <= 2) {
+ rndByte += 3;
+ }
+ return new ByteValue[]{
+ new ByteValue(Byte.MIN_VALUE),
+ new ByteValue(Integer.valueOf(-rndByte).byteValue()),
+ new ByteValue(Integer.valueOf(-1).byteValue()),
+ new ByteValue(Integer.valueOf(0).byteValue()),
+ new ByteValue(Integer.valueOf(1).byteValue()),
+ new ByteValue(Integer.valueOf(2).byteValue()),
+ new ByteValue(Integer.valueOf(rndByte).byteValue()),
+ new ByteValue(Byte.MAX_VALUE)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java
new file mode 100644
index 0000000..f952fc4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.CharValue;
+
+import java.util.Random;
+
+public class CharValueComparatorTest extends ComparatorTestBase<CharValue> {
+
+ @Override
+ protected TypeComparator<CharValue> createComparator(boolean ascending) {
+ return new CharValueComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<CharValue> createSerializer() {
+ return new CharValueSerializer();
+ }
+
+ @Override
+ protected CharValue[] getSortedTestData() {
+ Random rnd = new Random(874597969123412338L);
+ int rndChar = rnd.nextInt(Character.MAX_VALUE);
+ if(rndChar<0){
+ rndChar=-rndChar;
+ }
+ if(rndChar==(int)Character.MIN_VALUE){
+ rndChar+=2;
+ }
+ if(rndChar==(int)Character.MAX_VALUE){
+ rndChar-=2;
+ }
+ return new CharValue[]{
+ new CharValue(Character.MIN_VALUE),
+ new CharValue((char)rndChar),
+ new CharValue((char)(rndChar+1)),
+ new CharValue(Character.MAX_VALUE)
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java
new file mode 100644
index 0000000..b50ceca
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.DoubleValue;
+
+import java.util.Random;
+
+public class DoubleValueComparatorTest extends ComparatorTestBase<DoubleValue> {
+
+ @Override
+ protected TypeComparator<DoubleValue> createComparator(boolean ascending) {
+ return new DoubleValueComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<DoubleValue> createSerializer() {
+ return new DoubleValueSerializer();
+ }
+
+ @Override
+ protected DoubleValue[] getSortedTestData() {
+ Random rnd = new Random(874597969123412338L);
+ double rndDouble = rnd.nextDouble();
+ if (rndDouble < 0) {
+ rndDouble = -rndDouble;
+ }
+ if (rndDouble == Double.MAX_VALUE) {
+ rndDouble -= 3;
+ }
+ if (rndDouble <= 2) {
+ rndDouble += 3;
+ }
+ return new DoubleValue[]{
+ new DoubleValue(-rndDouble),
+ new DoubleValue(-1.0D),
+ new DoubleValue(0.0D),
+ new DoubleValue(2.0D),
+ new DoubleValue(rndDouble),
+ new DoubleValue(Double.MAX_VALUE)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java
new file mode 100644
index 0000000..67c5ba9
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.FloatValue;
+
+import java.util.Random;
+
+public class FloatValueComparatorTest extends ComparatorTestBase<FloatValue> {
+
+ @Override
+ protected TypeComparator<FloatValue> createComparator(boolean ascending) {
+ return new FloatValueComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<FloatValue> createSerializer() {
+ return new FloatValueSerializer();
+ }
+
+ @Override
+ protected FloatValue[] getSortedTestData() {
+ Random rnd = new Random(874597969123412338L);
+ float rndFloat = rnd.nextFloat();
+ if (rndFloat < 0) {
+ rndFloat = -rndFloat;
+ }
+ if (rndFloat == Float.MAX_VALUE) {
+ rndFloat -= 3;
+ }
+ if (rndFloat <= 2) {
+ rndFloat += 3;
+ }
+ return new FloatValue[]{
+ new FloatValue(-rndFloat),
+ new FloatValue(-1.0F),
+ new FloatValue(0.0F),
+ new FloatValue(2.0F),
+ new FloatValue(rndFloat),
+ new FloatValue(Float.MAX_VALUE)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java
new file mode 100644
index 0000000..037eb56
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.IntValue;
+
+import java.util.Random;
+
+public class IntValueComparatorTest extends ComparatorTestBase<IntValue> {
+
+ @Override
+ protected TypeComparator<IntValue> createComparator(boolean ascending) {
+ return new IntValueComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<IntValue> createSerializer() {
+ return new IntValueSerializer();
+ }
+
+ @Override
+ protected IntValue[] getSortedTestData() {
+
+ Random rnd = new Random(874597969123412338L);
+ int rndInt = rnd.nextInt();
+ if (rndInt < 0) {
+ rndInt = -rndInt;
+ }
+ if (rndInt == Integer.MAX_VALUE) {
+ rndInt -= 3;
+ }
+ if (rndInt <= 2) {
+ rndInt += 3;
+ }
+ return new IntValue[]{
+ new IntValue(Integer.MIN_VALUE),
+ new IntValue(-rndInt),
+ new IntValue(-1),
+ new IntValue(0),
+ new IntValue(1),
+ new IntValue(2),
+ new IntValue(rndInt),
+ new IntValue(Integer.MAX_VALUE)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java
new file mode 100644
index 0000000..6871a6b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.LongValue;
+
+import java.util.Random;
+
+public class LongValueComparatorTest extends ComparatorTestBase<LongValue> {
+
+ @Override
+ protected TypeComparator<LongValue> createComparator(boolean ascending) {
+ return new LongValueComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<LongValue> createSerializer() {
+ return new LongValueSerializer();
+ }
+
+ @Override
+ protected LongValue[] getSortedTestData() {
+ Random rnd = new Random(874597969123412338L);
+ long rndLong = rnd.nextLong();
+ if (rndLong < 0) {
+ rndLong = -rndLong;
+ }
+ if (rndLong == Long.MAX_VALUE) {
+ rndLong -= 3;
+ }
+ if (rndLong <= 2) {
+ rndLong += 3;
+ }
+ return new LongValue[]{
+ new LongValue(Long.MIN_VALUE),
+ new LongValue(-rndLong),
+ new LongValue(-1L),
+ new LongValue(0L),
+ new LongValue(1L),
+ new LongValue(2L),
+ new LongValue(rndLong),
+ new LongValue(Long.MAX_VALUE)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java
new file mode 100644
index 0000000..7d4a3a0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ShortValue;
+
+import java.util.Random;
+
+public class ShortValueComparatorTest extends ComparatorTestBase<ShortValue> {
+
+ @Override
+ protected TypeComparator<ShortValue> createComparator(boolean ascending) {
+ return new ShortValueComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<ShortValue> createSerializer() {
+ return new ShortValueSerializer();
+ }
+
+ @Override
+ protected ShortValue[] getSortedTestData() {
+ Random rnd = new Random(874597969123412338L);
+ short rndShort = Integer.valueOf(rnd.nextInt()).shortValue();
+ if (rndShort < 0) {
+ rndShort = Integer.valueOf(-rndShort).shortValue();
+ }
+ if (rndShort == Short.MAX_VALUE) {
+ rndShort -= 3;
+ }
+ if (rndShort <= 2) {
+ rndShort += 3;
+ }
+ return new ShortValue[]{
+ new ShortValue(Short.MIN_VALUE),
+ new ShortValue(Integer.valueOf(-rndShort).shortValue()),
+ new ShortValue(Integer.valueOf(-1).shortValue()),
+ new ShortValue(Integer.valueOf(0).shortValue()),
+ new ShortValue(Integer.valueOf(1).shortValue()),
+ new ShortValue(Integer.valueOf(2).shortValue()),
+ new ShortValue(Integer.valueOf(rndShort).shortValue()),
+ new ShortValue(Short.MAX_VALUE)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/08b075aa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java
new file mode 100644
index 0000000..e0b2f70
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.StringValue;
+
+public class StringValueComparatorTest extends ComparatorTestBase<StringValue> {
+
+ @Override
+ protected TypeComparator<StringValue> createComparator(boolean ascending) {
+ return new StringValueComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<StringValue> createSerializer() {
+ return new StringValueSerializer();
+ }
+
+ @Override
+ protected StringValue[] getSortedTestData() {
+ return new StringValue[]{
+ new StringValue(""),
+ new StringValue("Lorem Ipsum Dolor Omit Longer"),
+ new StringValue("aaaa"),
+ new StringValue("abcd"),
+ new StringValue("abce"),
+ new StringValue("abdd"),
+ new StringValue("accd"),
+ new StringValue("bbcd")
+ };
+ }
+}