You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/31 16:35:26 UTC

[3/6] flink git commit: [FLINK-3695] [gelly] ValueArray types

[FLINK-3695] [gelly] ValueArray types

Provide compact and efficiently serializable and comparable array
implementations for Flink mutable Value types and Java primitives.

This cloeses #3382


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/963f46e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/963f46e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/963f46e7

Branch: refs/heads/master
Commit: 963f46e7179db034fd1d444469f4af58eac87409
Parents: 43158a8
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Feb 21 11:40:22 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 31 11:14:36 2017 -0400

----------------------------------------------------------------------
 flink-libraries/flink-gelly/pom.xml             |  12 +-
 .../graph/types/valuearray/IntValueArray.java   | 398 ++++++++++++++
 .../valuearray/IntValueArrayComparator.java     | 156 ++++++
 .../valuearray/IntValueArraySerializer.java     |  85 +++
 .../graph/types/valuearray/LongValueArray.java  | 399 ++++++++++++++
 .../valuearray/LongValueArrayComparator.java    | 156 ++++++
 .../valuearray/LongValueArraySerializer.java    |  85 +++
 .../graph/types/valuearray/NullValueArray.java  | 267 ++++++++++
 .../valuearray/NullValueArrayComparator.java    | 147 ++++++
 .../valuearray/NullValueArraySerializer.java    |  85 +++
 .../types/valuearray/StringValueArray.java      | 518 +++++++++++++++++++
 .../valuearray/StringValueArrayComparator.java  | 217 ++++++++
 .../valuearray/StringValueArraySerializer.java  |  85 +++
 .../graph/types/valuearray/ValueArray.java      |  97 ++++
 .../types/valuearray/ValueArrayFactory.java     |  81 +++
 .../types/valuearray/ValueArrayTypeInfo.java    | 159 ++++++
 .../valuearray/ValueArrayTypeInfoFactory.java   |  41 ++
 .../valuearray/IntValueArrayComparatorTest.java |  51 ++
 .../valuearray/IntValueArraySerializerTest.java |  93 ++++
 .../types/valuearray/IntValueArrayTest.java     | 123 +++++
 .../LongValueArrayComparatorTest.java           |  51 ++
 .../LongValueArraySerializerTest.java           |  93 ++++
 .../types/valuearray/LongValueArrayTest.java    | 123 +++++
 .../NullValueArrayComparatorTest.java           |  51 ++
 .../NullValueArraySerializerTest.java           |  68 +++
 .../types/valuearray/NullValueArrayTest.java    |  80 +++
 .../StringValueArrayComparatorTest.java         |  51 ++
 .../StringValueArraySerializerTest.java         |  93 ++++
 .../types/valuearray/StringValueArrayTest.java  | 168 ++++++
 .../valuearray/ValueArrayTypeInfoTest.java      |  64 +++
 30 files changed, 4095 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/pom.xml b/flink-libraries/flink-gelly/pom.xml
index f773c70..fa09102 100644
--- a/flink-libraries/flink-gelly/pom.xml
+++ b/flink-libraries/flink-gelly/pom.xml
@@ -28,7 +28,7 @@ under the License.
 		<version>1.3-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
-	
+
 	<artifactId>flink-gelly_2.10</artifactId>
 	<name>flink-gelly</name>
 
@@ -59,7 +59,7 @@ under the License.
 		</dependency>
 
 		<!-- test dependencies -->
-		
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils_2.10</artifactId>
@@ -69,6 +69,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-optimizer_2.10</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
new file mode 100644
index 0000000..0e3812d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link IntValue}.
+ */
+public class IntValueArray
+implements ValueArray<IntValue> {
+
+	protected static final int ELEMENT_LENGTH_IN_BYTES = 4;
+
+	protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096;
+
+	// see note in ArrayList, HashTable, ...
+	private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+	private boolean isBounded;
+
+	private int[] data;
+
+	// the number of elements currently stored
+	private int position;
+
+	// location of the bookmark used by mark() and reset()
+	private transient int mark;
+
+	// hasher used to generate the normalized key
+	private Murmur3_32 hash = new Murmur3_32(0x11d2d865);
+
+	// hash result stored as normalized key
+	private IntValue hashValue = new IntValue();
+
+	/**
+	 * Initializes an expandable array with default capacity.
+	 */
+	public IntValueArray() {
+		isBounded = false;
+		initialize(DEFAULT_CAPACITY_IN_BYTES);
+	}
+
+	/**
+	 * Initializes a fixed-size array with the provided number of bytes.
+	 *
+	 * @param bytes number of bytes of the encapsulated array
+	 */
+	public IntValueArray(int bytes) {
+		isBounded = true;
+		initialize(bytes);
+	}
+
+	/**
+	 * Initializes the array with the provided number of bytes.
+	 *
+	 * @param bytes initial size of the encapsulated array in bytes
+	 */
+	private void initialize(int bytes) {
+		int capacity = bytes / ELEMENT_LENGTH_IN_BYTES;
+
+		Preconditions.checkArgument(capacity > 0, "Requested array with zero capacity");
+		Preconditions.checkArgument(capacity <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+		data = new int[capacity];
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * If the size of the array is insufficient to hold the given capacity then
+	 * copy the array into a new, larger array.
+	 *
+	 * @param minCapacity minimum required number of elements
+	 */
+	private void ensureCapacity(int minCapacity) {
+		long currentCapacity = data.length;
+
+		if (minCapacity <= currentCapacity) {
+			return;
+		}
+
+		// increase capacity by at least ~50%
+		long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1));
+		int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity);
+
+		if (newCapacity < minCapacity) {
+			// throw exception as unbounded arrays are not expected to fill
+			throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+		}
+
+		data = Arrays.copyOf(data, newCapacity);
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder("[");
+		for (int idx = 0 ; idx < this.position ; idx++) {
+			sb.append(data[idx]);
+			if (idx < position - 1) {
+				sb.append(",");
+			}
+		}
+		sb.append("]");
+
+		return sb.toString();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Iterable
+	// --------------------------------------------------------------------------------------------
+
+	private final ReadIterator iterator = new ReadIterator();
+
+	@Override
+	public Iterator<IntValue> iterator() {
+		iterator.reset();
+		return iterator;
+	}
+
+	private class ReadIterator
+	implements Iterator<IntValue> {
+		private IntValue value = new IntValue();
+
+		private int pos;
+
+		@Override
+		public boolean hasNext() {
+			return pos < position;
+		}
+
+		@Override
+		public IntValue next() {
+			value.setValue(data[pos++]);
+			return value;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("remove");
+		}
+
+		public void reset() {
+			pos = 0;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// IOReadableWritable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(position);
+
+		for (int i = 0 ; i < position ; i++) {
+			out.writeInt(data[i]);
+		}
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		position = in.readInt();
+		mark = 0;
+
+		ensureCapacity(position);
+
+		for (int i = 0 ; i < position ; i++) {
+			data[i] = in.readInt();
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// NormalizableKey
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int getMaxNormalizedKeyLen() {
+		return hashValue.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public void copyNormalizedKey(MemorySegment target, int offset, int len) {
+		hash.reset();
+
+		hash.hash(position);
+		for (int i = 0 ; i < position ; i++) {
+			hash.hash(data[i]);
+		}
+
+		hashValue.setValue(hash.hash());
+		hashValue.copyNormalizedKey(target, offset, len);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Comparable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int compareTo(ValueArray<IntValue> o) {
+		IntValueArray other = (IntValueArray) o;
+
+		int min = Math.min(position, other.position);
+		for (int i = 0 ; i < min ; i++) {
+			int cmp = Integer.compare(data[i], other.data[i]);
+
+			if (cmp != 0) {
+				return cmp;
+			}
+		}
+
+		return Integer.compare(position, other.position);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Key
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		int hash = 1;
+
+		for (int i = 0 ; i < position ; i++) {
+			hash = 31 * hash + data[i];
+		}
+
+		return hash;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof IntValueArray) {
+			IntValueArray other = (IntValueArray) obj;
+
+			if (position != other.position) {
+				return false;
+			}
+
+			for (int i = 0 ; i < position ; i++) {
+				if (data[i] != other.data[i]) {
+					return false;
+				}
+			}
+
+			return true;
+		}
+
+		return false;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ResettableValue
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void setValue(ValueArray<IntValue> value) {
+		value.copyTo(this);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// CopyableValue
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int getBinaryLength() {
+		return -1;
+	}
+
+	@Override
+	public void copyTo(ValueArray<IntValue> target) {
+		IntValueArray other = (IntValueArray) target;
+
+		other.position = position;
+		other.mark = mark;
+
+		other.ensureCapacity(position);
+		System.arraycopy(data, 0, other.data, 0, position);
+	}
+
+	@Override
+	public ValueArray<IntValue> copy() {
+		ValueArray<IntValue> copy = new IntValueArray();
+
+		this.copyTo(copy);
+
+		return copy;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		copyInternal(source, target);
+	}
+
+	protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException {
+		int count = source.readInt();
+		target.writeInt(count);
+
+		int bytes = ELEMENT_LENGTH_IN_BYTES * count;
+		target.write(source, bytes);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ValueArray
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int size() {
+		return position;
+	}
+
+	@Override
+	public boolean isFull() {
+		if (isBounded) {
+			return position == data.length;
+		} else {
+			return position == MAX_ARRAY_SIZE;
+		}
+	}
+
+	@Override
+	public boolean add(IntValue value) {
+		int newPosition = position + 1;
+
+		if (newPosition > data.length) {
+			if (isBounded) {
+				return false;
+			} else {
+				ensureCapacity(newPosition);
+			}
+		}
+
+		data[position] = value.getValue();
+		position = newPosition;
+
+		return true;
+	}
+
+	@Override
+	public boolean addAll(ValueArray<IntValue> other) {
+		IntValueArray source = (IntValueArray) other;
+
+		int sourceSize = source.position;
+		int newPosition = position + sourceSize;
+
+		if (newPosition > data.length) {
+			if (isBounded) {
+				return false;
+			} else {
+				ensureCapacity(newPosition);
+			}
+		}
+
+		System.arraycopy(source.data, 0, data, position, sourceSize);
+		position = newPosition;
+
+		return true;
+	}
+
+	@Override
+	public void clear() {
+		position = 0;
+	}
+
+	@Override
+	public void mark() {
+		mark = position;
+	}
+
+	@Override
+	public void reset() {
+		position = mark;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
new file mode 100644
index 0000000..bbc9bc5
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for IntValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class IntValueArrayComparator extends TypeComparator<IntValueArray> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final IntValueArray reference = new IntValueArray();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public IntValueArrayComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(IntValueArray record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(IntValueArray toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(IntValueArray candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<IntValueArray> referencedComparator) {
+		int comp = ((IntValueArrayComparator) referencedComparator).reference.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(IntValueArray first, IntValueArray second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		int firstCount = firstSource.readInt();
+		int secondCount = secondSource.readInt();
+
+		int minCount = Math.min(firstCount, secondCount);
+		while (minCount-- > 0) {
+			int firstValue = firstSource.readInt();
+			int secondValue = secondSource.readInt();
+
+			int cmp = Integer.compare(firstValue, secondValue);
+			if (cmp != 0) {
+				return ascendingComparison ? cmp : -cmp;
+			}
+		}
+
+		int cmp = Integer.compare(firstCount, secondCount);
+		return ascendingComparison ? cmp : -cmp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(IntValueArray.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(IntValueArray record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<IntValueArray> duplicate() {
+		return new IntValueArrayComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(IntValueArray record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public IntValueArray readWithKeyDenormalization(IntValueArray reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
new file mode 100644
index 0000000..b86fe87
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code IntValueArray}.
+ */
+public final class IntValueArraySerializer extends TypeSerializerSingleton<IntValueArray> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public IntValueArray createInstance() {
+		return new IntValueArray();
+	}
+
+	@Override
+	public IntValueArray copy(IntValueArray from) {
+		return copy(from, new IntValueArray());
+	}
+
+	@Override
+	public IntValueArray copy(IntValueArray from, IntValueArray reuse) {
+		reuse.setValue(from);
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(IntValueArray record, DataOutputView target) throws IOException {
+		record.write(target);
+	}
+
+	@Override
+	public IntValueArray deserialize(DataInputView source) throws IOException {
+		return deserialize(new IntValueArray(), source);
+	}
+
+	@Override
+	public IntValueArray deserialize(IntValueArray reuse, DataInputView source) throws IOException {
+		reuse.read(source);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		IntValueArray.copyInternal(source, target);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof IntValueArraySerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
new file mode 100644
index 0000000..7c01e6c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link LongValue}.
+ */
+public class LongValueArray
+implements ValueArray<LongValue> {
+
+	protected static final int ELEMENT_LENGTH_IN_BYTES = 8;
+
+	protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096;
+
+	// see note in ArrayList, HashTable, ...
+	private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+	private boolean isBounded;
+
+	private long[] data;
+
+	// the number of elements currently stored
+	private int position;
+
+	// location of the bookmark used by mark() and reset()
+	private transient int mark;
+
+	// hasher used to generate the normalized key
+	private Murmur3_32 hash = new Murmur3_32(0xdf099ea8);
+
+	// hash result stored as normalized key
+	private IntValue hashValue = new IntValue();
+
+	/**
+	 * Initializes an expandable array with default capacity.
+	 */
+	public LongValueArray() {
+		isBounded = false;
+		initialize(DEFAULT_CAPACITY_IN_BYTES);
+	}
+
+	/**
+	 * Initializes a fixed-size array with the provided number of bytes.
+	 *
+	 * @param bytes number of bytes of the encapsulated array
+	 */
+	public LongValueArray(int bytes) {
+		isBounded = true;
+		initialize(bytes);
+	}
+
+	/**
+	 * Initializes the array with the provided number of bytes.
+	 *
+	 * @param bytes initial size of the encapsulated array in bytes
+	 */
+	private void initialize(int bytes) {
+		int capacity = bytes / ELEMENT_LENGTH_IN_BYTES;
+
+		Preconditions.checkArgument(capacity > 0, "Requested array with zero capacity");
+		Preconditions.checkArgument(capacity <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+		data = new long[capacity];
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * If the size of the array is insufficient to hold the given capacity then
+	 * copy the array into a new, larger array.
+	 *
+	 * @param minCapacity minimum required number of elements
+	 */
+	private void ensureCapacity(int minCapacity) {
+		long currentCapacity = data.length;
+
+		if (minCapacity <= currentCapacity) {
+			return;
+		}
+
+		// increase capacity by at least ~50%
+		long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1));
+		int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity);
+
+		if (newCapacity < minCapacity) {
+			// throw exception as unbounded arrays are not expected to fill
+			throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+		}
+
+		data = Arrays.copyOf(data, newCapacity);
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder("[");
+		for (int idx = 0 ; idx < this.position ; idx++) {
+			sb.append(data[idx]);
+			if (idx < position - 1) {
+				sb.append(",");
+			}
+		}
+		sb.append("]");
+
+		return sb.toString();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Iterable
+	// --------------------------------------------------------------------------------------------
+
+	private final ReadIterator iterator = new ReadIterator();
+
+	@Override
+	public Iterator<LongValue> iterator() {
+		iterator.reset();
+		return iterator;
+	}
+
+	private class ReadIterator
+	implements Iterator<LongValue> {
+		private LongValue value = new LongValue();
+
+		private int pos;
+
+		@Override
+		public boolean hasNext() {
+			return pos < position;
+		}
+
+		@Override
+		public LongValue next() {
+			value.setValue(data[pos++]);
+			return value;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("remove");
+		}
+
+		public void reset() {
+			pos = 0;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// IOReadableWritable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(position);
+
+		for (int i = 0 ; i < position ; i++) {
+			out.writeLong(data[i]);
+		}
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		position = in.readInt();
+		mark = 0;
+
+		ensureCapacity(position);
+
+		for (int i = 0 ; i < position ; i++) {
+			data[i] = in.readLong();
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// NormalizableKey
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int getMaxNormalizedKeyLen() {
+		return hashValue.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public void copyNormalizedKey(MemorySegment target, int offset, int len) {
+		hash.reset();
+
+		hash.hash(position);
+		for (int i = 0 ; i < position ; i++) {
+			hash.hash(data[i]);
+		}
+
+		hashValue.setValue(hash.hash());
+		hashValue.copyNormalizedKey(target, offset, len);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Comparable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int compareTo(ValueArray<LongValue> o) {
+		LongValueArray other = (LongValueArray) o;
+
+		int min = Math.min(position, other.position);
+		for (int i = 0 ; i < min ; i++) {
+			int cmp = Long.compare(data[i], other.data[i]);
+
+			if (cmp != 0) {
+				return cmp;
+			}
+		}
+
+		return Integer.compare(position, other.position);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Key
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		int hash = 1;
+
+		for (int i = 0 ; i < position ; i++) {
+			hash = 31 * hash + (int) (data[i] ^ data[i] >>> 32);
+		}
+
+		return hash;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof LongValueArray) {
+			LongValueArray other = (LongValueArray) obj;
+
+			if (position != other.position) {
+				return false;
+			}
+
+			for (int i = 0 ; i < position ; i++) {
+				if (data[i] != other.data[i]) {
+					return false;
+				}
+			}
+
+			return true;
+		}
+
+		return false;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ResettableValue
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void setValue(ValueArray<LongValue> value) {
+		value.copyTo(this);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// CopyableValue
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int getBinaryLength() {
+		return -1;
+	}
+
+	@Override
+	public void copyTo(ValueArray<LongValue> target) {
+		LongValueArray other = (LongValueArray) target;
+
+		other.position = position;
+		other.mark = mark;
+
+		other.ensureCapacity(position);
+		System.arraycopy(data, 0, other.data, 0, position);
+	}
+
+	@Override
+	public ValueArray<LongValue> copy() {
+		ValueArray<LongValue> copy = new LongValueArray();
+
+		this.copyTo(copy);
+
+		return copy;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		copyInternal(source, target);
+	}
+
+	protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException {
+		int count = source.readInt();
+		target.writeInt(count);
+
+		int bytes = ELEMENT_LENGTH_IN_BYTES * count;
+		target.write(source, bytes);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ValueArray
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int size() {
+		return position;
+	}
+
+	@Override
+	public boolean isFull() {
+		if (isBounded) {
+			return position == data.length;
+		} else {
+			return position == MAX_ARRAY_SIZE;
+		}
+	}
+
+	@Override
+	public boolean add(LongValue value) {
+		int newPosition = position + 1;
+
+		if (newPosition > data.length) {
+			if (isBounded) {
+				return false;
+			} else {
+				ensureCapacity(newPosition);
+			}
+		}
+
+		data[position] = value.getValue();
+		position = newPosition;
+
+		return true;
+	}
+
+	@Override
+	public boolean addAll(ValueArray<LongValue> other) {
+		LongValueArray source = (LongValueArray) other;
+
+		int sourceSize = source.position;
+		int newPosition = position + sourceSize;
+
+		if (newPosition > data.length) {
+			if (isBounded) {
+				return false;
+			} else {
+				ensureCapacity(newPosition);
+			}
+		}
+
+		System.arraycopy(source.data, 0, data, position, sourceSize);
+		position = newPosition;
+
+		return true;
+	}
+
+	@Override
+	public void clear() {
+		position = 0;
+	}
+
+	@Override
+	public void mark() {
+		mark = position;
+	}
+
+	@Override
+	public void reset() {
+		position = mark;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
new file mode 100644
index 0000000..26c3da2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for LongValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class LongValueArrayComparator extends TypeComparator<LongValueArray> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final LongValueArray reference = new LongValueArray();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public LongValueArrayComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(LongValueArray record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(LongValueArray toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(LongValueArray candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<LongValueArray> referencedComparator) {
+		int comp = ((LongValueArrayComparator) referencedComparator).reference.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(LongValueArray first, LongValueArray second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		int firstCount = firstSource.readInt();
+		int secondCount = secondSource.readInt();
+
+		int minCount = Math.min(firstCount, secondCount);
+		while (minCount-- > 0) {
+			long firstValue = firstSource.readLong();
+			long secondValue = secondSource.readLong();
+
+			int cmp = Long.compare(firstValue, secondValue);
+			if (cmp != 0) {
+				return ascendingComparison ? cmp : -cmp;
+			}
+		}
+
+		int cmp = Integer.compare(firstCount, secondCount);
+		return ascendingComparison ? cmp : -cmp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(LongValueArray.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(LongValueArray record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<LongValueArray> duplicate() {
+		return new LongValueArrayComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(LongValueArray record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public LongValueArray readWithKeyDenormalization(LongValueArray reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
new file mode 100644
index 0000000..95219b6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code LongValueArray}.
+ */
+public final class LongValueArraySerializer extends TypeSerializerSingleton<LongValueArray> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public LongValueArray createInstance() {
+		return new LongValueArray();
+	}
+
+	@Override
+	public LongValueArray copy(LongValueArray from) {
+		return copy(from, new LongValueArray());
+	}
+
+	@Override
+	public LongValueArray copy(LongValueArray from, LongValueArray reuse) {
+		reuse.setValue(from);
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(LongValueArray record, DataOutputView target) throws IOException {
+		record.write(target);
+	}
+
+	@Override
+	public LongValueArray deserialize(DataInputView source) throws IOException {
+		return deserialize(new LongValueArray(), source);
+	}
+
+	@Override
+	public LongValueArray deserialize(LongValueArray reuse, DataInputView source) throws IOException {
+		reuse.read(source);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		LongValueArray.copyInternal(source, target);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof LongValueArraySerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
new file mode 100644
index 0000000..bf247a2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullValue;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * An array of {@link NullValue}.
+ */
+public class NullValueArray
+implements ValueArray<NullValue> {
+
+	// the number of elements currently stored
+	private int position;
+
+	// location of the bookmark used by mark() and reset()
+	private transient int mark;
+
+	// hash result stored as normalized key
+	private IntValue hashValue = new IntValue();
+
+	/**
+	 * Initializes an expandable array with default capacity.
+	 */
+	public NullValueArray() {
+	}
+
+	/**
+	 * Initializes a fixed-size array with the provided number of bytes.
+	 *
+	 * @param bytes number of bytes of the encapsulated array
+	 */
+	public NullValueArray(int bytes) {
+		this();
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder("[");
+		for (int idx = 0 ; idx < this.position ; idx++) {
+			sb.append("\u2205");
+			if (idx < position - 1) {
+				sb.append(",");
+			}
+		}
+		sb.append("]");
+
+		return sb.toString();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Iterable
+	// --------------------------------------------------------------------------------------------
+
+	private final ReadIterator iterator = new ReadIterator();
+
+	@Override
+	public Iterator<NullValue> iterator() {
+		iterator.reset();
+		return iterator;
+	}
+
+	private class ReadIterator
+	implements Iterator<NullValue> {
+		private int pos;
+
+		@Override
+		public boolean hasNext() {
+			return pos < position;
+		}
+
+		@Override
+		public NullValue next() {
+			pos++;
+			return NullValue.getInstance();
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("remove");
+		}
+
+		public void reset() {
+			pos = 0;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// IOReadableWritable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(position);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		position = in.readInt();
+		mark = 0;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// NormalizableKey
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int getMaxNormalizedKeyLen() {
+		return hashValue.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public void copyNormalizedKey(MemorySegment target, int offset, int len) {
+		hashValue.setValue(position);
+		hashValue.copyNormalizedKey(target, offset, len);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Comparable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int compareTo(ValueArray<NullValue> o) {
+		NullValueArray other = (NullValueArray) o;
+
+		return Integer.compare(position, other.position);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Key
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return position;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof NullValueArray) {
+			NullValueArray other = (NullValueArray) obj;
+
+			return position == other.position;
+		}
+
+		return false;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ResettableValue
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void setValue(ValueArray<NullValue> value) {
+		value.copyTo(this);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// CopyableValue
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int getBinaryLength() {
+		return hashValue.getBinaryLength();
+	}
+
+	@Override
+	public void copyTo(ValueArray<NullValue> target) {
+		NullValueArray other = (NullValueArray) target;
+
+		other.position = position;
+	}
+
+	@Override
+	public ValueArray<NullValue> copy() {
+		ValueArray<NullValue> copy = new NullValueArray();
+
+		this.copyTo(copy);
+
+		return copy;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.write(source, getBinaryLength());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ValueArray
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int size() {
+		return position;
+	}
+
+	@Override
+	public boolean isFull() {
+		return position == Integer.MAX_VALUE;
+	}
+
+	@Override
+	public boolean add(NullValue value) {
+		if (position == Integer.MAX_VALUE) {
+			return false;
+		}
+
+		position++;
+
+		return true;
+	}
+
+	@Override
+	public boolean addAll(ValueArray<NullValue> other) {
+		NullValueArray source = (NullValueArray) other;
+
+		long newPosition = position + (long) source.position;
+
+		if (newPosition > Integer.MAX_VALUE) {
+			return false;
+		}
+
+		position = (int) newPosition;
+
+		return true;
+	}
+
+	@Override
+	public void clear() {
+		position = 0;
+	}
+
+	@Override
+	public void mark() {
+		mark = position;
+	}
+
+	@Override
+	public void reset() {
+		position = mark;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
new file mode 100644
index 0000000..2228d6e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for NullValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class NullValueArrayComparator extends TypeComparator<NullValueArray> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final NullValueArray reference = new NullValueArray();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public NullValueArrayComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(NullValueArray record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(NullValueArray toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(NullValueArray candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<NullValueArray> referencedComparator) {
+		int comp = ((NullValueArrayComparator) referencedComparator).reference.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(NullValueArray first, NullValueArray second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		int firstCount = firstSource.readInt();
+		int secondCount = secondSource.readInt();
+
+		int cmp = Integer.compare(firstCount, secondCount);
+		return ascendingComparison ? cmp : -cmp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(NullValueArray.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(NullValueArray record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<NullValueArray> duplicate() {
+		return new NullValueArrayComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// key normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		// see ComparatorTestBase#testNormalizedKeyReadWriter fixes in FLINK-4705
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(NullValueArray record, DataOutputView target) throws IOException {
+		record.write(target);
+	}
+
+	@Override
+	public NullValueArray readWithKeyDenormalization(NullValueArray reuse, DataInputView source) throws IOException {
+		reuse.read(source);
+		return reuse;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
new file mode 100644
index 0000000..233ed20
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code NullValueArray}.
+ */
+public final class NullValueArraySerializer extends TypeSerializerSingleton<NullValueArray> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public NullValueArray createInstance() {
+		return new NullValueArray();
+	}
+
+	@Override
+	public NullValueArray copy(NullValueArray from) {
+		return copy(from, new NullValueArray());
+	}
+
+	@Override
+	public NullValueArray copy(NullValueArray from, NullValueArray reuse) {
+		reuse.setValue(from);
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return 4;
+	}
+
+	@Override
+	public void serialize(NullValueArray record, DataOutputView target) throws IOException {
+		record.write(target);
+	}
+
+	@Override
+	public NullValueArray deserialize(DataInputView source) throws IOException {
+		return deserialize(new NullValueArray(), source);
+	}
+
+	@Override
+	public NullValueArray deserialize(NullValueArray reuse, DataInputView source) throws IOException {
+		reuse.read(source);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.write(source, getLength());
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof NullValueArraySerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
new file mode 100644
index 0000000..4699552
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.CharBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link StringValue}.
+ * <p>
+ * Strings are serialized to a byte array. Concatenating arrays is as simple
+ * and fast as extending and copying byte arrays. Strings are serialized when
+ * individually added to {@code StringValueArray}.
+ * <p>
+ * For each string added to the array the length is first serialized using a
+ * variable length integer. Then the string characters are serialized using a
+ * variable length encoding where the lower 128 ASCII/UFT-8 characters are
+ * encoded in a single byte. This ensures that common characters are serialized
+ * in only two bytes.
+ */
+public class StringValueArray
+implements ValueArray<StringValue> {
+
+	protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096;
+
+	// see note in ArrayList, HashTable, ...
+	private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+	protected static final int HIGH_BIT = 0x1 << 7;
+
+	private boolean isBounded;
+
+	// the initial length of a bounded array, which is allowed to expand to
+	// store one additional element beyond this initial length
+	private int boundedLength;
+
+	private byte[] data;
+
+	// number of StringValue elements currently stored
+	private int length;
+
+	// the number of bytes currently stored
+	private int position;
+
+	// state for the bookmark used by mark() and reset()
+	private transient int markLength;
+
+	private transient int markPosition;
+
+	// hasher used to generate the normalized key
+	private Murmur3_32 hash = new Murmur3_32(0x19264330);
+
+	// hash result stored as normalized key
+	private IntValue hashValue = new IntValue();
+
+	/**
+	 * Initializes an expandable array with default capacity.
+	 */
+	public StringValueArray() {
+		isBounded = false;
+		initialize(DEFAULT_CAPACITY_IN_BYTES);
+	}
+
+	/**
+	 * Initializes a fixed-size array with the provided number of bytes.
+	 *
+	 * @param bytes number of bytes of the encapsulated array
+	 */
+	public StringValueArray(int bytes) {
+		isBounded = true;
+		boundedLength = bytes;
+		initialize(bytes);
+	}
+
+	/**
+	 * Initializes the array with the provided number of bytes.
+	 *
+	 * @param bytes initial size of the encapsulated array in bytes
+	 */
+	private void initialize(int bytes) {
+		Preconditions.checkArgument(bytes > 0, "Requested array with zero capacity");
+		Preconditions.checkArgument(bytes <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+		data = new byte[bytes];
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * If the size of the array is insufficient to hold the given capacity then
+	 * copy the array into a new, larger array.
+	 *
+	 * @param minCapacity minimum required number of elements
+	 */
+	private void ensureCapacity(int minCapacity) {
+		long currentCapacity = data.length;
+
+		if (minCapacity <= currentCapacity) {
+			return;
+		}
+
+		// increase capacity by at least ~50%
+		long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1));
+		int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity);
+
+		if (newCapacity < minCapacity) {
+			// throw exception as unbounded arrays are not expected to fill
+			throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+		}
+
+		data = Arrays.copyOf(data, newCapacity);
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder("[");
+		String separator = "";
+
+		for (StringValue sv : this) {
+			sb
+				.append(sv.getValue())
+				.append(separator);
+			separator = ",";
+		}
+
+		sb.append("]");
+
+		return sb.toString();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Iterable
+	// --------------------------------------------------------------------------------------------
+
+	private final ReadIterator iterator = new ReadIterator();
+
+	@Override
+	public Iterator<StringValue> iterator() {
+		iterator.reset();
+		return iterator;
+	}
+
+	private class ReadIterator
+	implements Iterator<StringValue> {
+		private static final int DEFAULT_SIZE = 64;
+
+		private StringValue value = new StringValue(CharBuffer.allocate(DEFAULT_SIZE));
+
+		private int size = DEFAULT_SIZE;
+
+		private int pos;
+
+		@Override
+		public boolean hasNext() {
+			return pos < position;
+		}
+
+		@Override
+		public StringValue next() {
+			// read length
+			int len = data[pos++] & 0xFF;
+
+			if (len >= HIGH_BIT) {
+				int shift = 7;
+				int curr;
+				len = len & 0x7F;
+				while ((curr = data[pos++] & 0xFF) >= HIGH_BIT) {
+					len |= (curr & 0x7F) << shift;
+					shift += 7;
+				}
+				len |= curr << shift;
+			}
+
+			// ensure capacity
+			if (len > size) {
+				while (size < len) {
+					size *= 2;
+				}
+
+				value = new StringValue(CharBuffer.allocate(size));
+			}
+
+			// read string characters
+			final char[] valueData = value.getCharArray();
+
+			for (int i = 0; i < len; i++) {
+				int c = data[pos++] & 0xFF;
+				if (c >= HIGH_BIT) {
+					int shift = 7;
+					int curr;
+					c = c & 0x7F;
+					while ((curr = data[pos++] & 0xFF) >= HIGH_BIT) {
+						c |= (curr & 0x7F) << shift;
+						shift += 7;
+					}
+					c |= curr << shift;
+				}
+				valueData[i] = (char) c;
+			}
+
+			return value;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("remove");
+		}
+
+		public void reset() {
+			pos = 0;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// IOReadableWritable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(length);
+		out.writeInt(position);
+
+		out.write(data, 0, position);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		length = in.readInt();
+		position = in.readInt();
+
+		markLength = 0;
+		markPosition = 0;
+
+		ensureCapacity(position);
+
+		in.read(data, 0, position);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// NormalizableKey
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int getMaxNormalizedKeyLen() {
+		return hashValue.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public void copyNormalizedKey(MemorySegment target, int offset, int len) {
+		hash.reset();
+
+		hash.hash(position);
+		for (int i = 0 ; i < position ; i++) {
+			hash.hash(data[i]);
+		}
+
+		hashValue.setValue(hash.hash());
+		hashValue.copyNormalizedKey(target, offset, len);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Comparable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int compareTo(ValueArray<StringValue> o) {
+		StringValueArray other = (StringValueArray) o;
+
+		// sorts first on number of data in the array, then comparison between
+		// the first non-equal element in the arrays
+		int cmp = Integer.compare(position, other.position);
+
+		if (cmp != 0) {
+			return cmp;
+		}
+
+		for (int i = 0 ; i < position ; i++) {
+			cmp = Byte.compare(data[i], other.data[i]);
+
+			if (cmp != 0) {
+				return cmp;
+			}
+		}
+
+		return 0;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Key
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		int hash = 1;
+
+		for (int i = 0 ; i < position ; i++) {
+			hash = 31 * hash + data[i];
+		}
+
+		return hash;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof StringValueArray) {
+			StringValueArray other = (StringValueArray) obj;
+
+			if (length != other.length) {
+				return false;
+			}
+
+			if (position != other.position) {
+				return false;
+			}
+
+			for (int i = 0 ; i < position ; i++) {
+				if (data[i] != other.data[i]) {
+					return false;
+				}
+			}
+
+			return true;
+		}
+
+		return false;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ResettableValue
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void setValue(ValueArray<StringValue> value) {
+		value.copyTo(this);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// CopyableValue
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int getBinaryLength() {
+		return -1;
+	}
+
+	@Override
+	public void copyTo(ValueArray<StringValue> target) {
+		StringValueArray other = (StringValueArray) target;
+
+		other.length = length;
+		other.position = position;
+		other.markLength = markLength;
+		other.markPosition = markPosition;
+
+		other.ensureCapacity(position);
+		System.arraycopy(data, 0, other.data, 0, position);
+	}
+
+	@Override
+	public ValueArray<StringValue> copy() {
+		ValueArray<StringValue> copy = new StringValueArray();
+
+		this.copyTo(copy);
+
+		return copy;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		copyInternal(source, target);
+	}
+
+	protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException {
+		int length = source.readInt();
+		target.writeInt(length);
+
+		int position = source.readInt();
+		target.writeInt(position);
+
+		target.write(source, position);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ValueArray
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int size() {
+		return length;
+	}
+
+	@Override
+	public boolean isFull() {
+		if (isBounded) {
+			return position >= boundedLength;
+		} else {
+			return position == MAX_ARRAY_SIZE;
+		}
+	}
+
+	@Override
+	public boolean add(StringValue value) {
+		if (isBounded && position >= boundedLength) {
+			return false;
+		}
+
+		// up to five bytes storing length
+		if (position + 5 > data.length) {
+			ensureCapacity(position + 5);
+		}
+
+		// update local variable until serialization succeeds
+		int newPosition = position;
+
+		// write the length, variable-length encoded
+		int len = value.length();
+
+		while (len >= HIGH_BIT) {
+			data[newPosition++] = (byte) (len | HIGH_BIT);
+			len >>>= 7;
+		}
+		data[newPosition++] = (byte) len;
+
+		// write the char data, variable-length encoded
+		final char[] valueData = value.getCharArray();
+		int remainingCapacity = data.length - newPosition;
+
+		len = value.length();
+		for (int i = 0; i < len; i++) {
+			// up to three bytes storing length
+			if (remainingCapacity < 3) {
+				ensureCapacity(remainingCapacity + 3);
+				remainingCapacity = data.length - newPosition;
+			}
+
+			int c = valueData[i];
+
+			while (c >= HIGH_BIT) {
+				data[newPosition++] = (byte) (c | HIGH_BIT);
+				remainingCapacity--;
+				c >>>= 7;
+			}
+			data[newPosition++] = (byte) c;
+			remainingCapacity--;
+		}
+
+		length++;
+		position = newPosition;
+
+		return true;
+	}
+
+	@Override
+	public boolean addAll(ValueArray<StringValue> other) {
+		StringValueArray source = (StringValueArray) other;
+
+		int sourceSize = source.position;
+		int newPosition = position + sourceSize;
+
+		if (newPosition > data.length) {
+			if (isBounded) {
+				return false;
+			} else {
+				ensureCapacity(newPosition);
+			}
+		}
+
+		System.arraycopy(source.data, 0, data, position, sourceSize);
+		length += source.length;
+  	    position = newPosition;
+
+		return true;
+	}
+
+	@Override
+	public void clear() {
+		length = 0;
+		position = 0;
+	}
+
+	@Override
+	public void mark() {
+		markLength = length;
+		markPosition = position;
+	}
+
+	@Override
+	public void reset() {
+		length = markLength;
+		position = markPosition;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
new file mode 100644
index 0000000..df88a8e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+import static org.apache.flink.graph.types.valuearray.StringValueArray.HIGH_BIT;
+
+/**
+ * Specialized comparator for StringValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class StringValueArrayComparator extends TypeComparator<StringValueArray> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final StringValueArray reference = new StringValueArray();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public StringValueArrayComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(StringValueArray record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(StringValueArray toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(StringValueArray candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<StringValueArray> referencedComparator) {
+		int comp = ((StringValueArrayComparator) referencedComparator).reference.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(StringValueArray first, StringValueArray second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	/**
+	 * Read the length of the next serialized {@code StringValue}.
+	 *
+	 * @param source the input view containing the record
+	 * @return the length of the next serialized {@code StringValue}
+	 * @throws IOException if the input view raised an exception when reading the length
+	 */
+	private static int readStringLength(DataInputView source) throws IOException {
+		int len = source.readByte() & 0xFF;
+
+		if (len >= HIGH_BIT) {
+			int shift = 7;
+			int curr;
+			len = len & 0x7F;
+			while ((curr = source.readByte() & 0xFF) >= HIGH_BIT) {
+				len |= (curr & 0x7F) << shift;
+				shift += 7;
+			}
+			len |= curr << shift;
+		}
+
+		return len;
+	}
+
+	/**
+	 * Read the next character from the serialized {@code StringValue}.
+	 *
+	 * @param source the input view containing the record
+	 * @return the next {@code char} of the current serialized {@code StringValue}
+	 * @throws IOException if the input view raised an exception when reading the length
+	 */
+	private static char readStringChar(DataInputView source) throws IOException {
+		int c = source.readByte() & 0xFF;
+
+		if (c >= HIGH_BIT) {
+			int shift = 7;
+			int curr;
+			c = c & 0x7F;
+			while ((curr = source.readByte() & 0xFF) >= HIGH_BIT) {
+				c |= (curr & 0x7F) << shift;
+				shift += 7;
+			}
+			c |= curr << shift;
+		}
+
+		return (char) c;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		int firstCount = firstSource.readInt();
+		int secondCount = secondSource.readInt();
+
+		int minCount = Math.min(firstCount, secondCount);
+		while (minCount-- > 0) {
+			int firstLength = readStringLength(firstSource);
+			int secondLength = readStringLength(secondSource);
+
+			int minLength = Math.min(firstLength, secondLength);
+			while (minLength-- > 0) {
+				char firstChar = readStringChar(firstSource);
+				char secondChar = readStringChar(secondSource);
+
+				int cmp = Character.compare(firstChar, secondChar);
+				if (cmp != 0) {
+					return ascendingComparison ? cmp : -cmp;
+				}
+			}
+
+			int cmp = Integer.compare(firstLength, secondLength);
+			if (cmp != 0) {
+				return ascendingComparison ? cmp : -cmp;
+			}
+		}
+
+		int cmp = Integer.compare(firstCount, secondCount);
+		return ascendingComparison ? cmp : -cmp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(StringValueArray.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(StringValueArray record, MemorySegment target, int offset, int numBytes) {
+		record.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<StringValueArray> duplicate() {
+		return new StringValueArrayComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(StringValueArray record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public StringValueArray readWithKeyDenormalization(StringValueArray reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
new file mode 100644
index 0000000..0e875e3
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code StringValueArray}.
+ */
+public final class StringValueArraySerializer extends TypeSerializerSingleton<StringValueArray> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public StringValueArray createInstance() {
+		return new StringValueArray();
+	}
+
+	@Override
+	public StringValueArray copy(StringValueArray from) {
+		return copy(from, new StringValueArray());
+	}
+
+	@Override
+	public StringValueArray copy(StringValueArray from, StringValueArray reuse) {
+		reuse.setValue(from);
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(StringValueArray record, DataOutputView target) throws IOException {
+		record.write(target);
+	}
+
+	@Override
+	public StringValueArray deserialize(DataInputView source) throws IOException {
+		return deserialize(new StringValueArray(), source);
+	}
+
+	@Override
+	public StringValueArray deserialize(StringValueArray reuse, DataInputView source) throws IOException {
+		reuse.read(source);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		StringValueArray.copyInternal(source, target);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof StringValueArraySerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
new file mode 100644
index 0000000..6e34b71
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.ResettableValue;
+import org.apache.flink.types.Value;
+
+import java.io.Serializable;
+
+/**
+ * Basic interface for array types which reuse objects during serialization.
+ *
+ * Value arrays are usable as grouping keys but not sorting keys.
+ *
+ * @param <T> the {@link Value} type
+ */
+@TypeInfo(ValueArrayTypeInfoFactory.class)
+public interface ValueArray<T>
+extends Iterable<T>, IOReadableWritable, Serializable, NormalizableKey<ValueArray<T>>, ResettableValue<ValueArray<T>>, CopyableValue<ValueArray<T>> {
+
+	/**
+	 * Returns the number of elements stored in the array.
+	 *
+	 * @return the number of elements stored in the array
+	 */
+	int size();
+
+	/**
+	 * An bounded array fills when the allocated capacity has been fully used.
+	 * An unbounded array will only fill when the underlying data structure has
+	 * reached capacity, for example the ~2^31 element limit for Java arrays.
+	 *
+	 * @return whether the array is full
+	 */
+	boolean isFull();
+
+	/**
+	 * Appends the value to this array if and only if the array capacity would
+	 * not be exceeded.
+	 *
+	 * @param value the value to add to this array
+	 * @return whether the value was added to the array
+	 */
+	boolean add(T value);
+
+	/**
+	 * Appends all of the values in the specified array to the end of this
+	 * array. If the combined array would exceed capacity then no values are
+	 * appended.
+	 *
+	 * @param source array containing values to be added to this array
+	 * @return whether the values were added to the array
+	 */
+	boolean addAll(ValueArray<T> source);
+
+	/**
+	 * Saves the array index, which can be restored by calling {@code reset()}.
+	 *
+	 * This is not serialized and is not part of the contract for
+	 * {@link #equals(Object)}.
+	 */
+	void mark();
+
+	/**
+	 * Restores the array index to when {@code mark()} was last called.
+	 */
+	void reset();
+
+	/**
+	 * Resets the array to the empty state. The implementation is *not*
+	 * expected to release the underlying data structure. This allows the array
+	 * to be reused with minimal impact on the garbage collector.
+	 *
+	 * This may reset the {@link #mark()} in order to allow arrays be shrunk.
+	 */
+	void clear();
+}