You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/31 16:35:26 UTC
[3/6] flink git commit: [FLINK-3695] [gelly] ValueArray types
[FLINK-3695] [gelly] ValueArray types
Provide compact and efficiently serializable and comparable array
implementations for Flink mutable Value types and Java primitives.
This cloeses #3382
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/963f46e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/963f46e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/963f46e7
Branch: refs/heads/master
Commit: 963f46e7179db034fd1d444469f4af58eac87409
Parents: 43158a8
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Feb 21 11:40:22 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 31 11:14:36 2017 -0400
----------------------------------------------------------------------
flink-libraries/flink-gelly/pom.xml | 12 +-
.../graph/types/valuearray/IntValueArray.java | 398 ++++++++++++++
.../valuearray/IntValueArrayComparator.java | 156 ++++++
.../valuearray/IntValueArraySerializer.java | 85 +++
.../graph/types/valuearray/LongValueArray.java | 399 ++++++++++++++
.../valuearray/LongValueArrayComparator.java | 156 ++++++
.../valuearray/LongValueArraySerializer.java | 85 +++
.../graph/types/valuearray/NullValueArray.java | 267 ++++++++++
.../valuearray/NullValueArrayComparator.java | 147 ++++++
.../valuearray/NullValueArraySerializer.java | 85 +++
.../types/valuearray/StringValueArray.java | 518 +++++++++++++++++++
.../valuearray/StringValueArrayComparator.java | 217 ++++++++
.../valuearray/StringValueArraySerializer.java | 85 +++
.../graph/types/valuearray/ValueArray.java | 97 ++++
.../types/valuearray/ValueArrayFactory.java | 81 +++
.../types/valuearray/ValueArrayTypeInfo.java | 159 ++++++
.../valuearray/ValueArrayTypeInfoFactory.java | 41 ++
.../valuearray/IntValueArrayComparatorTest.java | 51 ++
.../valuearray/IntValueArraySerializerTest.java | 93 ++++
.../types/valuearray/IntValueArrayTest.java | 123 +++++
.../LongValueArrayComparatorTest.java | 51 ++
.../LongValueArraySerializerTest.java | 93 ++++
.../types/valuearray/LongValueArrayTest.java | 123 +++++
.../NullValueArrayComparatorTest.java | 51 ++
.../NullValueArraySerializerTest.java | 68 +++
.../types/valuearray/NullValueArrayTest.java | 80 +++
.../StringValueArrayComparatorTest.java | 51 ++
.../StringValueArraySerializerTest.java | 93 ++++
.../types/valuearray/StringValueArrayTest.java | 168 ++++++
.../valuearray/ValueArrayTypeInfoTest.java | 64 +++
30 files changed, 4095 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/pom.xml b/flink-libraries/flink-gelly/pom.xml
index f773c70..fa09102 100644
--- a/flink-libraries/flink-gelly/pom.xml
+++ b/flink-libraries/flink-gelly/pom.xml
@@ -28,7 +28,7 @@ under the License.
<version>1.3-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
-
+
<artifactId>flink-gelly_2.10</artifactId>
<name>flink-gelly</name>
@@ -59,7 +59,7 @@ under the License.
</dependency>
<!-- test dependencies -->
-
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
@@ -69,6 +69,14 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-optimizer_2.10</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
new file mode 100644
index 0000000..0e3812d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link IntValue}.
+ */
+public class IntValueArray
+implements ValueArray<IntValue> {
+
+ protected static final int ELEMENT_LENGTH_IN_BYTES = 4;
+
+ protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096;
+
+ // see note in ArrayList, HashTable, ...
+ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+ private boolean isBounded;
+
+ private int[] data;
+
+ // the number of elements currently stored
+ private int position;
+
+ // location of the bookmark used by mark() and reset()
+ private transient int mark;
+
+ // hasher used to generate the normalized key
+ private Murmur3_32 hash = new Murmur3_32(0x11d2d865);
+
+ // hash result stored as normalized key
+ private IntValue hashValue = new IntValue();
+
+ /**
+ * Initializes an expandable array with default capacity.
+ */
+ public IntValueArray() {
+ isBounded = false;
+ initialize(DEFAULT_CAPACITY_IN_BYTES);
+ }
+
+ /**
+ * Initializes a fixed-size array with the provided number of bytes.
+ *
+ * @param bytes number of bytes of the encapsulated array
+ */
+ public IntValueArray(int bytes) {
+ isBounded = true;
+ initialize(bytes);
+ }
+
+ /**
+ * Initializes the array with the provided number of bytes.
+ *
+ * @param bytes initial size of the encapsulated array in bytes
+ */
+ private void initialize(int bytes) {
+ int capacity = bytes / ELEMENT_LENGTH_IN_BYTES;
+
+ Preconditions.checkArgument(capacity > 0, "Requested array with zero capacity");
+ Preconditions.checkArgument(capacity <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+ data = new int[capacity];
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * If the size of the array is insufficient to hold the given capacity then
+ * copy the array into a new, larger array.
+ *
+ * @param minCapacity minimum required number of elements
+ */
+ private void ensureCapacity(int minCapacity) {
+ long currentCapacity = data.length;
+
+ if (minCapacity <= currentCapacity) {
+ return;
+ }
+
+ // increase capacity by at least ~50%
+ long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1));
+ int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity);
+
+ if (newCapacity < minCapacity) {
+ // throw exception as unbounded arrays are not expected to fill
+ throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+ }
+
+ data = Arrays.copyOf(data, newCapacity);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("[");
+ for (int idx = 0 ; idx < this.position ; idx++) {
+ sb.append(data[idx]);
+ if (idx < position - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append("]");
+
+ return sb.toString();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Iterable
+ // --------------------------------------------------------------------------------------------
+
+ private final ReadIterator iterator = new ReadIterator();
+
+ @Override
+ public Iterator<IntValue> iterator() {
+ iterator.reset();
+ return iterator;
+ }
+
+ private class ReadIterator
+ implements Iterator<IntValue> {
+ private IntValue value = new IntValue();
+
+ private int pos;
+
+ @Override
+ public boolean hasNext() {
+ return pos < position;
+ }
+
+ @Override
+ public IntValue next() {
+ value.setValue(data[pos++]);
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+
+ public void reset() {
+ pos = 0;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // IOReadableWritable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(position);
+
+ for (int i = 0 ; i < position ; i++) {
+ out.writeInt(data[i]);
+ }
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ position = in.readInt();
+ mark = 0;
+
+ ensureCapacity(position);
+
+ for (int i = 0 ; i < position ; i++) {
+ data[i] = in.readInt();
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // NormalizableKey
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getMaxNormalizedKeyLen() {
+ return hashValue.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public void copyNormalizedKey(MemorySegment target, int offset, int len) {
+ hash.reset();
+
+ hash.hash(position);
+ for (int i = 0 ; i < position ; i++) {
+ hash.hash(data[i]);
+ }
+
+ hashValue.setValue(hash.hash());
+ hashValue.copyNormalizedKey(target, offset, len);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Comparable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int compareTo(ValueArray<IntValue> o) {
+ IntValueArray other = (IntValueArray) o;
+
+ int min = Math.min(position, other.position);
+ for (int i = 0 ; i < min ; i++) {
+ int cmp = Integer.compare(data[i], other.data[i]);
+
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return Integer.compare(position, other.position);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Key
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+
+ for (int i = 0 ; i < position ; i++) {
+ hash = 31 * hash + data[i];
+ }
+
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof IntValueArray) {
+ IntValueArray other = (IntValueArray) obj;
+
+ if (position != other.position) {
+ return false;
+ }
+
+ for (int i = 0 ; i < position ; i++) {
+ if (data[i] != other.data[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ResettableValue
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void setValue(ValueArray<IntValue> value) {
+ value.copyTo(this);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // CopyableValue
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getBinaryLength() {
+ return -1;
+ }
+
+ @Override
+ public void copyTo(ValueArray<IntValue> target) {
+ IntValueArray other = (IntValueArray) target;
+
+ other.position = position;
+ other.mark = mark;
+
+ other.ensureCapacity(position);
+ System.arraycopy(data, 0, other.data, 0, position);
+ }
+
+ @Override
+ public ValueArray<IntValue> copy() {
+ ValueArray<IntValue> copy = new IntValueArray();
+
+ this.copyTo(copy);
+
+ return copy;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ copyInternal(source, target);
+ }
+
+ protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException {
+ int count = source.readInt();
+ target.writeInt(count);
+
+ int bytes = ELEMENT_LENGTH_IN_BYTES * count;
+ target.write(source, bytes);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ValueArray
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int size() {
+ return position;
+ }
+
+ @Override
+ public boolean isFull() {
+ if (isBounded) {
+ return position == data.length;
+ } else {
+ return position == MAX_ARRAY_SIZE;
+ }
+ }
+
+ @Override
+ public boolean add(IntValue value) {
+ int newPosition = position + 1;
+
+ if (newPosition > data.length) {
+ if (isBounded) {
+ return false;
+ } else {
+ ensureCapacity(newPosition);
+ }
+ }
+
+ data[position] = value.getValue();
+ position = newPosition;
+
+ return true;
+ }
+
+ @Override
+ public boolean addAll(ValueArray<IntValue> other) {
+ IntValueArray source = (IntValueArray) other;
+
+ int sourceSize = source.position;
+ int newPosition = position + sourceSize;
+
+ if (newPosition > data.length) {
+ if (isBounded) {
+ return false;
+ } else {
+ ensureCapacity(newPosition);
+ }
+ }
+
+ System.arraycopy(source.data, 0, data, position, sourceSize);
+ position = newPosition;
+
+ return true;
+ }
+
+ @Override
+ public void clear() {
+ position = 0;
+ }
+
+ @Override
+ public void mark() {
+ mark = position;
+ }
+
+ @Override
+ public void reset() {
+ position = mark;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
new file mode 100644
index 0000000..bbc9bc5
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for IntValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class IntValueArrayComparator extends TypeComparator<IntValueArray> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final IntValueArray reference = new IntValueArray();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public IntValueArrayComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(IntValueArray record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(IntValueArray toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(IntValueArray candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<IntValueArray> referencedComparator) {
+ int comp = ((IntValueArrayComparator) referencedComparator).reference.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(IntValueArray first, IntValueArray second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ int firstCount = firstSource.readInt();
+ int secondCount = secondSource.readInt();
+
+ int minCount = Math.min(firstCount, secondCount);
+ while (minCount-- > 0) {
+ int firstValue = firstSource.readInt();
+ int secondValue = secondSource.readInt();
+
+ int cmp = Integer.compare(firstValue, secondValue);
+ if (cmp != 0) {
+ return ascendingComparison ? cmp : -cmp;
+ }
+ }
+
+ int cmp = Integer.compare(firstCount, secondCount);
+ return ascendingComparison ? cmp : -cmp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(IntValueArray.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(IntValueArray record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<IntValueArray> duplicate() {
+ return new IntValueArrayComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(IntValueArray record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IntValueArray readWithKeyDenormalization(IntValueArray reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
new file mode 100644
index 0000000..b86fe87
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code IntValueArray}.
+ */
+public final class IntValueArraySerializer extends TypeSerializerSingleton<IntValueArray> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public IntValueArray createInstance() {
+ return new IntValueArray();
+ }
+
+ @Override
+ public IntValueArray copy(IntValueArray from) {
+ return copy(from, new IntValueArray());
+ }
+
+ @Override
+ public IntValueArray copy(IntValueArray from, IntValueArray reuse) {
+ reuse.setValue(from);
+ return reuse;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(IntValueArray record, DataOutputView target) throws IOException {
+ record.write(target);
+ }
+
+ @Override
+ public IntValueArray deserialize(DataInputView source) throws IOException {
+ return deserialize(new IntValueArray(), source);
+ }
+
+ @Override
+ public IntValueArray deserialize(IntValueArray reuse, DataInputView source) throws IOException {
+ reuse.read(source);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ IntValueArray.copyInternal(source, target);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof IntValueArraySerializer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
new file mode 100644
index 0000000..7c01e6c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link LongValue}.
+ */
+public class LongValueArray
+implements ValueArray<LongValue> {
+
+ protected static final int ELEMENT_LENGTH_IN_BYTES = 8;
+
+ protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096;
+
+ // see note in ArrayList, HashTable, ...
+ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+ private boolean isBounded;
+
+ private long[] data;
+
+ // the number of elements currently stored
+ private int position;
+
+ // location of the bookmark used by mark() and reset()
+ private transient int mark;
+
+ // hasher used to generate the normalized key
+ private Murmur3_32 hash = new Murmur3_32(0xdf099ea8);
+
+ // hash result stored as normalized key
+ private IntValue hashValue = new IntValue();
+
+ /**
+ * Initializes an expandable array with default capacity.
+ */
+ public LongValueArray() {
+ isBounded = false;
+ initialize(DEFAULT_CAPACITY_IN_BYTES);
+ }
+
+ /**
+ * Initializes a fixed-size array with the provided number of bytes.
+ *
+ * @param bytes number of bytes of the encapsulated array
+ */
+ public LongValueArray(int bytes) {
+ isBounded = true;
+ initialize(bytes);
+ }
+
+ /**
+ * Initializes the array with the provided number of bytes.
+ *
+ * @param bytes initial size of the encapsulated array in bytes
+ */
+ private void initialize(int bytes) {
+ int capacity = bytes / ELEMENT_LENGTH_IN_BYTES;
+
+ Preconditions.checkArgument(capacity > 0, "Requested array with zero capacity");
+ Preconditions.checkArgument(capacity <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+ data = new long[capacity];
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * If the size of the array is insufficient to hold the given capacity then
+ * copy the array into a new, larger array.
+ *
+ * @param minCapacity minimum required number of elements
+ */
+ private void ensureCapacity(int minCapacity) {
+ long currentCapacity = data.length;
+
+ if (minCapacity <= currentCapacity) {
+ return;
+ }
+
+ // increase capacity by at least ~50%
+ long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1));
+ int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity);
+
+ if (newCapacity < minCapacity) {
+ // throw exception as unbounded arrays are not expected to fill
+ throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+ }
+
+ data = Arrays.copyOf(data, newCapacity);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("[");
+ for (int idx = 0 ; idx < this.position ; idx++) {
+ sb.append(data[idx]);
+ if (idx < position - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append("]");
+
+ return sb.toString();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Iterable
+ // --------------------------------------------------------------------------------------------
+
+ private final ReadIterator iterator = new ReadIterator();
+
+ @Override
+ public Iterator<LongValue> iterator() {
+ iterator.reset();
+ return iterator;
+ }
+
+ private class ReadIterator
+ implements Iterator<LongValue> {
+ private LongValue value = new LongValue();
+
+ private int pos;
+
+ @Override
+ public boolean hasNext() {
+ return pos < position;
+ }
+
+ @Override
+ public LongValue next() {
+ value.setValue(data[pos++]);
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+
+ public void reset() {
+ pos = 0;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // IOReadableWritable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(position);
+
+ for (int i = 0 ; i < position ; i++) {
+ out.writeLong(data[i]);
+ }
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ position = in.readInt();
+ mark = 0;
+
+ ensureCapacity(position);
+
+ for (int i = 0 ; i < position ; i++) {
+ data[i] = in.readLong();
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // NormalizableKey
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getMaxNormalizedKeyLen() {
+ return hashValue.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public void copyNormalizedKey(MemorySegment target, int offset, int len) {
+ hash.reset();
+
+ hash.hash(position);
+ for (int i = 0 ; i < position ; i++) {
+ hash.hash(data[i]);
+ }
+
+ hashValue.setValue(hash.hash());
+ hashValue.copyNormalizedKey(target, offset, len);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Comparable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int compareTo(ValueArray<LongValue> o) {
+ LongValueArray other = (LongValueArray) o;
+
+ int min = Math.min(position, other.position);
+ for (int i = 0 ; i < min ; i++) {
+ int cmp = Long.compare(data[i], other.data[i]);
+
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return Integer.compare(position, other.position);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Key
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+
+ for (int i = 0 ; i < position ; i++) {
+ hash = 31 * hash + (int) (data[i] ^ data[i] >>> 32);
+ }
+
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof LongValueArray) {
+ LongValueArray other = (LongValueArray) obj;
+
+ if (position != other.position) {
+ return false;
+ }
+
+ for (int i = 0 ; i < position ; i++) {
+ if (data[i] != other.data[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ResettableValue
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void setValue(ValueArray<LongValue> value) {
+ value.copyTo(this);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // CopyableValue
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getBinaryLength() {
+ return -1;
+ }
+
+ @Override
+ public void copyTo(ValueArray<LongValue> target) {
+ LongValueArray other = (LongValueArray) target;
+
+ other.position = position;
+ other.mark = mark;
+
+ other.ensureCapacity(position);
+ System.arraycopy(data, 0, other.data, 0, position);
+ }
+
+ @Override
+ public ValueArray<LongValue> copy() {
+ ValueArray<LongValue> copy = new LongValueArray();
+
+ this.copyTo(copy);
+
+ return copy;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ copyInternal(source, target);
+ }
+
+ protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException {
+ int count = source.readInt();
+ target.writeInt(count);
+
+ int bytes = ELEMENT_LENGTH_IN_BYTES * count;
+ target.write(source, bytes);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ValueArray
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int size() {
+ return position;
+ }
+
+ @Override
+ public boolean isFull() {
+ if (isBounded) {
+ return position == data.length;
+ } else {
+ return position == MAX_ARRAY_SIZE;
+ }
+ }
+
+ @Override
+ public boolean add(LongValue value) {
+ int newPosition = position + 1;
+
+ if (newPosition > data.length) {
+ if (isBounded) {
+ return false;
+ } else {
+ ensureCapacity(newPosition);
+ }
+ }
+
+ data[position] = value.getValue();
+ position = newPosition;
+
+ return true;
+ }
+
+ @Override
+ public boolean addAll(ValueArray<LongValue> other) {
+ LongValueArray source = (LongValueArray) other;
+
+ int sourceSize = source.position;
+ int newPosition = position + sourceSize;
+
+ if (newPosition > data.length) {
+ if (isBounded) {
+ return false;
+ } else {
+ ensureCapacity(newPosition);
+ }
+ }
+
+ System.arraycopy(source.data, 0, data, position, sourceSize);
+ position = newPosition;
+
+ return true;
+ }
+
+ @Override
+ public void clear() {
+ position = 0;
+ }
+
+ @Override
+ public void mark() {
+ mark = position;
+ }
+
+ @Override
+ public void reset() {
+ position = mark;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
new file mode 100644
index 0000000..26c3da2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for LongValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class LongValueArrayComparator extends TypeComparator<LongValueArray> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final LongValueArray reference = new LongValueArray();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public LongValueArrayComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(LongValueArray record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(LongValueArray toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(LongValueArray candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<LongValueArray> referencedComparator) {
+ int comp = ((LongValueArrayComparator) referencedComparator).reference.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(LongValueArray first, LongValueArray second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ int firstCount = firstSource.readInt();
+ int secondCount = secondSource.readInt();
+
+ int minCount = Math.min(firstCount, secondCount);
+ while (minCount-- > 0) {
+ long firstValue = firstSource.readLong();
+ long secondValue = secondSource.readLong();
+
+ int cmp = Long.compare(firstValue, secondValue);
+ if (cmp != 0) {
+ return ascendingComparison ? cmp : -cmp;
+ }
+ }
+
+ int cmp = Integer.compare(firstCount, secondCount);
+ return ascendingComparison ? cmp : -cmp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(LongValueArray.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(LongValueArray record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<LongValueArray> duplicate() {
+ return new LongValueArrayComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(LongValueArray record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LongValueArray readWithKeyDenormalization(LongValueArray reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
new file mode 100644
index 0000000..95219b6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code LongValueArray}.
+ */
+public final class LongValueArraySerializer extends TypeSerializerSingleton<LongValueArray> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public LongValueArray createInstance() {
+ return new LongValueArray();
+ }
+
+ @Override
+ public LongValueArray copy(LongValueArray from) {
+ return copy(from, new LongValueArray());
+ }
+
+ @Override
+ public LongValueArray copy(LongValueArray from, LongValueArray reuse) {
+ reuse.setValue(from);
+ return reuse;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(LongValueArray record, DataOutputView target) throws IOException {
+ record.write(target);
+ }
+
+ @Override
+ public LongValueArray deserialize(DataInputView source) throws IOException {
+ return deserialize(new LongValueArray(), source);
+ }
+
+ @Override
+ public LongValueArray deserialize(LongValueArray reuse, DataInputView source) throws IOException {
+ reuse.read(source);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ LongValueArray.copyInternal(source, target);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof LongValueArraySerializer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
new file mode 100644
index 0000000..bf247a2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullValue;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * An array of {@link NullValue}.
+ */
+public class NullValueArray
+implements ValueArray<NullValue> {
+
+ // the number of elements currently stored
+ private int position;
+
+ // location of the bookmark used by mark() and reset()
+ private transient int mark;
+
+ // hash result stored as normalized key
+ private IntValue hashValue = new IntValue();
+
+ /**
+ * Initializes an expandable array with default capacity.
+ */
+ public NullValueArray() {
+ }
+
+ /**
+ * Initializes a fixed-size array with the provided number of bytes.
+ *
+ * @param bytes number of bytes of the encapsulated array
+ */
+ public NullValueArray(int bytes) {
+ this();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("[");
+ for (int idx = 0 ; idx < this.position ; idx++) {
+ sb.append("\u2205");
+ if (idx < position - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append("]");
+
+ return sb.toString();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Iterable
+ // --------------------------------------------------------------------------------------------
+
+ private final ReadIterator iterator = new ReadIterator();
+
+ @Override
+ public Iterator<NullValue> iterator() {
+ iterator.reset();
+ return iterator;
+ }
+
+ private class ReadIterator
+ implements Iterator<NullValue> {
+ private int pos;
+
+ @Override
+ public boolean hasNext() {
+ return pos < position;
+ }
+
+ @Override
+ public NullValue next() {
+ pos++;
+ return NullValue.getInstance();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+
+ public void reset() {
+ pos = 0;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // IOReadableWritable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(position);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ position = in.readInt();
+ mark = 0;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // NormalizableKey
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getMaxNormalizedKeyLen() {
+ return hashValue.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public void copyNormalizedKey(MemorySegment target, int offset, int len) {
+ hashValue.setValue(position);
+ hashValue.copyNormalizedKey(target, offset, len);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Comparable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int compareTo(ValueArray<NullValue> o) {
+ NullValueArray other = (NullValueArray) o;
+
+ return Integer.compare(position, other.position);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Key
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return position;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof NullValueArray) {
+ NullValueArray other = (NullValueArray) obj;
+
+ return position == other.position;
+ }
+
+ return false;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ResettableValue
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void setValue(ValueArray<NullValue> value) {
+ value.copyTo(this);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // CopyableValue
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getBinaryLength() {
+ return hashValue.getBinaryLength();
+ }
+
+ @Override
+ public void copyTo(ValueArray<NullValue> target) {
+ NullValueArray other = (NullValueArray) target;
+
+ other.position = position;
+ }
+
+ @Override
+ public ValueArray<NullValue> copy() {
+ ValueArray<NullValue> copy = new NullValueArray();
+
+ this.copyTo(copy);
+
+ return copy;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.write(source, getBinaryLength());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ValueArray
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int size() {
+ return position;
+ }
+
+ @Override
+ public boolean isFull() {
+ return position == Integer.MAX_VALUE;
+ }
+
+ @Override
+ public boolean add(NullValue value) {
+ if (position == Integer.MAX_VALUE) {
+ return false;
+ }
+
+ position++;
+
+ return true;
+ }
+
+ @Override
+ public boolean addAll(ValueArray<NullValue> other) {
+ NullValueArray source = (NullValueArray) other;
+
+ long newPosition = position + (long) source.position;
+
+ if (newPosition > Integer.MAX_VALUE) {
+ return false;
+ }
+
+ position = (int) newPosition;
+
+ return true;
+ }
+
+ @Override
+ public void clear() {
+ position = 0;
+ }
+
+ @Override
+ public void mark() {
+ mark = position;
+ }
+
+ @Override
+ public void reset() {
+ position = mark;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
new file mode 100644
index 0000000..2228d6e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for NullValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class NullValueArrayComparator extends TypeComparator<NullValueArray> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final NullValueArray reference = new NullValueArray();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public NullValueArrayComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(NullValueArray record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(NullValueArray toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(NullValueArray candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<NullValueArray> referencedComparator) {
+ int comp = ((NullValueArrayComparator) referencedComparator).reference.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(NullValueArray first, NullValueArray second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ int firstCount = firstSource.readInt();
+ int secondCount = secondSource.readInt();
+
+ int cmp = Integer.compare(firstCount, secondCount);
+ return ascendingComparison ? cmp : -cmp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(NullValueArray.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(NullValueArray record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<NullValueArray> duplicate() {
+ return new NullValueArrayComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // key normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ // see ComparatorTestBase#testNormalizedKeyReadWriter fixes in FLINK-4705
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(NullValueArray record, DataOutputView target) throws IOException {
+ record.write(target);
+ }
+
+ @Override
+ public NullValueArray readWithKeyDenormalization(NullValueArray reuse, DataInputView source) throws IOException {
+ reuse.read(source);
+ return reuse;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
new file mode 100644
index 0000000..233ed20
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code NullValueArray}.
+ */
+public final class NullValueArraySerializer extends TypeSerializerSingleton<NullValueArray> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public NullValueArray createInstance() {
+ return new NullValueArray();
+ }
+
+ @Override
+ public NullValueArray copy(NullValueArray from) {
+ return copy(from, new NullValueArray());
+ }
+
+ @Override
+ public NullValueArray copy(NullValueArray from, NullValueArray reuse) {
+ reuse.setValue(from);
+ return reuse;
+ }
+
+ @Override
+ public int getLength() {
+ return 4;
+ }
+
+ @Override
+ public void serialize(NullValueArray record, DataOutputView target) throws IOException {
+ record.write(target);
+ }
+
+ @Override
+ public NullValueArray deserialize(DataInputView source) throws IOException {
+ return deserialize(new NullValueArray(), source);
+ }
+
+ @Override
+ public NullValueArray deserialize(NullValueArray reuse, DataInputView source) throws IOException {
+ reuse.read(source);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.write(source, getLength());
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof NullValueArraySerializer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
new file mode 100644
index 0000000..4699552
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.CharBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link StringValue}.
+ * <p>
+ * Strings are serialized to a byte array. Concatenating arrays is as simple
+ * and fast as extending and copying byte arrays. Strings are serialized when
+ * individually added to {@code StringValueArray}.
+ * <p>
+ * For each string added to the array the length is first serialized using a
+ * variable length integer. Then the string characters are serialized using a
+ * variable length encoding where the lower 128 ASCII/UFT-8 characters are
+ * encoded in a single byte. This ensures that common characters are serialized
+ * in only two bytes.
+ */
+public class StringValueArray
+implements ValueArray<StringValue> {
+
+ protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096;
+
+ // see note in ArrayList, HashTable, ...
+ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+ protected static final int HIGH_BIT = 0x1 << 7;
+
+ private boolean isBounded;
+
+ // the initial length of a bounded array, which is allowed to expand to
+ // store one additional element beyond this initial length
+ private int boundedLength;
+
+ private byte[] data;
+
+ // number of StringValue elements currently stored
+ private int length;
+
+ // the number of bytes currently stored
+ private int position;
+
+ // state for the bookmark used by mark() and reset()
+ private transient int markLength;
+
+ private transient int markPosition;
+
+ // hasher used to generate the normalized key
+ private Murmur3_32 hash = new Murmur3_32(0x19264330);
+
+ // hash result stored as normalized key
+ private IntValue hashValue = new IntValue();
+
+ /**
+ * Initializes an expandable array with default capacity.
+ */
+ public StringValueArray() {
+ isBounded = false;
+ initialize(DEFAULT_CAPACITY_IN_BYTES);
+ }
+
+ /**
+ * Initializes a fixed-size array with the provided number of bytes.
+ *
+ * @param bytes number of bytes of the encapsulated array
+ */
+ public StringValueArray(int bytes) {
+ isBounded = true;
+ boundedLength = bytes;
+ initialize(bytes);
+ }
+
+ /**
+ * Initializes the array with the provided number of bytes.
+ *
+ * @param bytes initial size of the encapsulated array in bytes
+ */
+ private void initialize(int bytes) {
+ Preconditions.checkArgument(bytes > 0, "Requested array with zero capacity");
+ Preconditions.checkArgument(bytes <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+ data = new byte[bytes];
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * If the size of the array is insufficient to hold the given capacity then
+ * copy the array into a new, larger array.
+ *
+ * @param minCapacity minimum required number of elements
+ */
+ private void ensureCapacity(int minCapacity) {
+ long currentCapacity = data.length;
+
+ if (minCapacity <= currentCapacity) {
+ return;
+ }
+
+ // increase capacity by at least ~50%
+ long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1));
+ int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity);
+
+ if (newCapacity < minCapacity) {
+ // throw exception as unbounded arrays are not expected to fill
+ throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+ }
+
+ data = Arrays.copyOf(data, newCapacity);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("[");
+ String separator = "";
+
+ for (StringValue sv : this) {
+ sb
+ .append(sv.getValue())
+ .append(separator);
+ separator = ",";
+ }
+
+ sb.append("]");
+
+ return sb.toString();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Iterable
+ // --------------------------------------------------------------------------------------------
+
+ private final ReadIterator iterator = new ReadIterator();
+
+ @Override
+ public Iterator<StringValue> iterator() {
+ iterator.reset();
+ return iterator;
+ }
+
+ private class ReadIterator
+ implements Iterator<StringValue> {
+ private static final int DEFAULT_SIZE = 64;
+
+ private StringValue value = new StringValue(CharBuffer.allocate(DEFAULT_SIZE));
+
+ private int size = DEFAULT_SIZE;
+
+ private int pos;
+
+ @Override
+ public boolean hasNext() {
+ return pos < position;
+ }
+
+ @Override
+ public StringValue next() {
+ // read length
+ int len = data[pos++] & 0xFF;
+
+ if (len >= HIGH_BIT) {
+ int shift = 7;
+ int curr;
+ len = len & 0x7F;
+ while ((curr = data[pos++] & 0xFF) >= HIGH_BIT) {
+ len |= (curr & 0x7F) << shift;
+ shift += 7;
+ }
+ len |= curr << shift;
+ }
+
+ // ensure capacity
+ if (len > size) {
+ while (size < len) {
+ size *= 2;
+ }
+
+ value = new StringValue(CharBuffer.allocate(size));
+ }
+
+ // read string characters
+ final char[] valueData = value.getCharArray();
+
+ for (int i = 0; i < len; i++) {
+ int c = data[pos++] & 0xFF;
+ if (c >= HIGH_BIT) {
+ int shift = 7;
+ int curr;
+ c = c & 0x7F;
+ while ((curr = data[pos++] & 0xFF) >= HIGH_BIT) {
+ c |= (curr & 0x7F) << shift;
+ shift += 7;
+ }
+ c |= curr << shift;
+ }
+ valueData[i] = (char) c;
+ }
+
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+
+ public void reset() {
+ pos = 0;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // IOReadableWritable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(length);
+ out.writeInt(position);
+
+ out.write(data, 0, position);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ length = in.readInt();
+ position = in.readInt();
+
+ markLength = 0;
+ markPosition = 0;
+
+ ensureCapacity(position);
+
+ in.read(data, 0, position);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // NormalizableKey
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getMaxNormalizedKeyLen() {
+ return hashValue.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public void copyNormalizedKey(MemorySegment target, int offset, int len) {
+ hash.reset();
+
+ hash.hash(position);
+ for (int i = 0 ; i < position ; i++) {
+ hash.hash(data[i]);
+ }
+
+ hashValue.setValue(hash.hash());
+ hashValue.copyNormalizedKey(target, offset, len);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Comparable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int compareTo(ValueArray<StringValue> o) {
+ StringValueArray other = (StringValueArray) o;
+
+ // sorts first on number of data in the array, then comparison between
+ // the first non-equal element in the arrays
+ int cmp = Integer.compare(position, other.position);
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ for (int i = 0 ; i < position ; i++) {
+ cmp = Byte.compare(data[i], other.data[i]);
+
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return 0;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Key
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+
+ for (int i = 0 ; i < position ; i++) {
+ hash = 31 * hash + data[i];
+ }
+
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof StringValueArray) {
+ StringValueArray other = (StringValueArray) obj;
+
+ if (length != other.length) {
+ return false;
+ }
+
+ if (position != other.position) {
+ return false;
+ }
+
+ for (int i = 0 ; i < position ; i++) {
+ if (data[i] != other.data[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ResettableValue
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void setValue(ValueArray<StringValue> value) {
+ value.copyTo(this);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // CopyableValue
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getBinaryLength() {
+ return -1;
+ }
+
+ @Override
+ public void copyTo(ValueArray<StringValue> target) {
+ StringValueArray other = (StringValueArray) target;
+
+ other.length = length;
+ other.position = position;
+ other.markLength = markLength;
+ other.markPosition = markPosition;
+
+ other.ensureCapacity(position);
+ System.arraycopy(data, 0, other.data, 0, position);
+ }
+
+ @Override
+ public ValueArray<StringValue> copy() {
+ ValueArray<StringValue> copy = new StringValueArray();
+
+ this.copyTo(copy);
+
+ return copy;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ copyInternal(source, target);
+ }
+
+ protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException {
+ int length = source.readInt();
+ target.writeInt(length);
+
+ int position = source.readInt();
+ target.writeInt(position);
+
+ target.write(source, position);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ValueArray
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int size() {
+ return length;
+ }
+
+ @Override
+ public boolean isFull() {
+ if (isBounded) {
+ return position >= boundedLength;
+ } else {
+ return position == MAX_ARRAY_SIZE;
+ }
+ }
+
+ @Override
+ public boolean add(StringValue value) {
+ if (isBounded && position >= boundedLength) {
+ return false;
+ }
+
+ // up to five bytes storing length
+ if (position + 5 > data.length) {
+ ensureCapacity(position + 5);
+ }
+
+ // update local variable until serialization succeeds
+ int newPosition = position;
+
+ // write the length, variable-length encoded
+ int len = value.length();
+
+ while (len >= HIGH_BIT) {
+ data[newPosition++] = (byte) (len | HIGH_BIT);
+ len >>>= 7;
+ }
+ data[newPosition++] = (byte) len;
+
+ // write the char data, variable-length encoded
+ final char[] valueData = value.getCharArray();
+ int remainingCapacity = data.length - newPosition;
+
+ len = value.length();
+ for (int i = 0; i < len; i++) {
+ // up to three bytes storing length
+ if (remainingCapacity < 3) {
+ ensureCapacity(remainingCapacity + 3);
+ remainingCapacity = data.length - newPosition;
+ }
+
+ int c = valueData[i];
+
+ while (c >= HIGH_BIT) {
+ data[newPosition++] = (byte) (c | HIGH_BIT);
+ remainingCapacity--;
+ c >>>= 7;
+ }
+ data[newPosition++] = (byte) c;
+ remainingCapacity--;
+ }
+
+ length++;
+ position = newPosition;
+
+ return true;
+ }
+
+ @Override
+ public boolean addAll(ValueArray<StringValue> other) {
+ StringValueArray source = (StringValueArray) other;
+
+ int sourceSize = source.position;
+ int newPosition = position + sourceSize;
+
+ if (newPosition > data.length) {
+ if (isBounded) {
+ return false;
+ } else {
+ ensureCapacity(newPosition);
+ }
+ }
+
+ System.arraycopy(source.data, 0, data, position, sourceSize);
+ length += source.length;
+ position = newPosition;
+
+ return true;
+ }
+
+ @Override
+ public void clear() {
+ length = 0;
+ position = 0;
+ }
+
+ @Override
+ public void mark() {
+ markLength = length;
+ markPosition = position;
+ }
+
+ @Override
+ public void reset() {
+ length = markLength;
+ position = markPosition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
new file mode 100644
index 0000000..df88a8e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+import static org.apache.flink.graph.types.valuearray.StringValueArray.HIGH_BIT;
+
+/**
+ * Specialized comparator for StringValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class StringValueArrayComparator extends TypeComparator<StringValueArray> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final StringValueArray reference = new StringValueArray();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public StringValueArrayComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(StringValueArray record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(StringValueArray toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(StringValueArray candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<StringValueArray> referencedComparator) {
+ int comp = ((StringValueArrayComparator) referencedComparator).reference.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(StringValueArray first, StringValueArray second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ /**
+ * Read the length of the next serialized {@code StringValue}.
+ *
+ * @param source the input view containing the record
+ * @return the length of the next serialized {@code StringValue}
+ * @throws IOException if the input view raised an exception when reading the length
+ */
+ private static int readStringLength(DataInputView source) throws IOException {
+ int len = source.readByte() & 0xFF;
+
+ if (len >= HIGH_BIT) {
+ int shift = 7;
+ int curr;
+ len = len & 0x7F;
+ while ((curr = source.readByte() & 0xFF) >= HIGH_BIT) {
+ len |= (curr & 0x7F) << shift;
+ shift += 7;
+ }
+ len |= curr << shift;
+ }
+
+ return len;
+ }
+
+ /**
+ * Read the next character from the serialized {@code StringValue}.
+ *
+ * @param source the input view containing the record
+ * @return the next {@code char} of the current serialized {@code StringValue}
+ * @throws IOException if the input view raised an exception when reading the length
+ */
+ private static char readStringChar(DataInputView source) throws IOException {
+ int c = source.readByte() & 0xFF;
+
+ if (c >= HIGH_BIT) {
+ int shift = 7;
+ int curr;
+ c = c & 0x7F;
+ while ((curr = source.readByte() & 0xFF) >= HIGH_BIT) {
+ c |= (curr & 0x7F) << shift;
+ shift += 7;
+ }
+ c |= curr << shift;
+ }
+
+ return (char) c;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ int firstCount = firstSource.readInt();
+ int secondCount = secondSource.readInt();
+
+ int minCount = Math.min(firstCount, secondCount);
+ while (minCount-- > 0) {
+ int firstLength = readStringLength(firstSource);
+ int secondLength = readStringLength(secondSource);
+
+ int minLength = Math.min(firstLength, secondLength);
+ while (minLength-- > 0) {
+ char firstChar = readStringChar(firstSource);
+ char secondChar = readStringChar(secondSource);
+
+ int cmp = Character.compare(firstChar, secondChar);
+ if (cmp != 0) {
+ return ascendingComparison ? cmp : -cmp;
+ }
+ }
+
+ int cmp = Integer.compare(firstLength, secondLength);
+ if (cmp != 0) {
+ return ascendingComparison ? cmp : -cmp;
+ }
+ }
+
+ int cmp = Integer.compare(firstCount, secondCount);
+ return ascendingComparison ? cmp : -cmp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(StringValueArray.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(StringValueArray record, MemorySegment target, int offset, int numBytes) {
+ record.copyNormalizedKey(target, offset, numBytes);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<StringValueArray> duplicate() {
+ return new StringValueArrayComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(StringValueArray record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StringValueArray readWithKeyDenormalization(StringValueArray reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
new file mode 100644
index 0000000..0e875e3
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code StringValueArray}.
+ */
+public final class StringValueArraySerializer extends TypeSerializerSingleton<StringValueArray> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public StringValueArray createInstance() {
+ return new StringValueArray();
+ }
+
+ @Override
+ public StringValueArray copy(StringValueArray from) {
+ return copy(from, new StringValueArray());
+ }
+
+ @Override
+ public StringValueArray copy(StringValueArray from, StringValueArray reuse) {
+ reuse.setValue(from);
+ return reuse;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(StringValueArray record, DataOutputView target) throws IOException {
+ record.write(target);
+ }
+
+ @Override
+ public StringValueArray deserialize(DataInputView source) throws IOException {
+ return deserialize(new StringValueArray(), source);
+ }
+
+ @Override
+ public StringValueArray deserialize(StringValueArray reuse, DataInputView source) throws IOException {
+ reuse.read(source);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ StringValueArray.copyInternal(source, target);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof StringValueArraySerializer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
new file mode 100644
index 0000000..6e34b71
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.ResettableValue;
+import org.apache.flink.types.Value;
+
+import java.io.Serializable;
+
+/**
+ * Basic interface for array types which reuse objects during serialization.
+ *
+ * Value arrays are usable as grouping keys but not sorting keys.
+ *
+ * @param <T> the {@link Value} type
+ */
+@TypeInfo(ValueArrayTypeInfoFactory.class)
+public interface ValueArray<T>
+extends Iterable<T>, IOReadableWritable, Serializable, NormalizableKey<ValueArray<T>>, ResettableValue<ValueArray<T>>, CopyableValue<ValueArray<T>> {
+
+ /**
+ * Returns the number of elements stored in the array.
+ *
+ * @return the number of elements stored in the array
+ */
+ int size();
+
+ /**
+ * An bounded array fills when the allocated capacity has been fully used.
+ * An unbounded array will only fill when the underlying data structure has
+ * reached capacity, for example the ~2^31 element limit for Java arrays.
+ *
+ * @return whether the array is full
+ */
+ boolean isFull();
+
+ /**
+ * Appends the value to this array if and only if the array capacity would
+ * not be exceeded.
+ *
+ * @param value the value to add to this array
+ * @return whether the value was added to the array
+ */
+ boolean add(T value);
+
+ /**
+ * Appends all of the values in the specified array to the end of this
+ * array. If the combined array would exceed capacity then no values are
+ * appended.
+ *
+ * @param source array containing values to be added to this array
+ * @return whether the values were added to the array
+ */
+ boolean addAll(ValueArray<T> source);
+
+ /**
+ * Saves the array index, which can be restored by calling {@code reset()}.
+ *
+ * This is not serialized and is not part of the contract for
+ * {@link #equals(Object)}.
+ */
+ void mark();
+
+ /**
+ * Restores the array index to when {@code mark()} was last called.
+ */
+ void reset();
+
+ /**
+ * Resets the array to the empty state. The implementation is *not*
+ * expected to release the underlying data structure. This allows the array
+ * to be reused with minimal impact on the garbage collector.
+ *
+ * This may reset the {@link #mark()} in order to allow arrays be shrunk.
+ */
+ void clear();
+}