You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/05 20:53:08 UTC

[3/6] flink git commit: [FLINK-7023] [gelly] Remaining types for Gelly ValueArrays

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArray.java
new file mode 100644
index 0000000..aa99989
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArray.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.ShortValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link ShortValue}.
+ */
+public class ShortValueArray
+implements ValueArray<ShortValue> {
+
+	protected static final int ELEMENT_LENGTH_IN_BYTES = 2;
+
+	protected static final int DEFAULT_CAPACITY_IN_BYTES = 1024;
+
+	// see note in ArrayList, HashTable, ...
+	private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+	private boolean isBounded;
+
+	private short[] data;
+
+	// the number of elements currently stored
+	private int position;
+
+	// location of the bookmark used by mark() and reset()
+	private transient int mark;
+
+	// hasher used to generate the normalized key
+	private MurmurHash hash = new MurmurHash(0xb3148e81);
+
+	// hash result stored as normalized key
+	private IntValue hashValue = new IntValue();
+
+	/**
+	 * Initializes an expandable array with default capacity.
+	 */
+	public ShortValueArray() {
+		isBounded = false;
+		initialize(DEFAULT_CAPACITY_IN_BYTES);
+	}
+
+	/**
+	 * Initializes a fixed-size array with the provided number of shorts.
+	 *
+	 * @param bytes number of bytes of the encapsulated array
+	 */
+	public ShortValueArray(int bytes) {
+		isBounded = true;
+		initialize(bytes);
+	}
+
+	/**
+	 * Initializes the array with the provided number of bytes.
+	 *
+	 * @param bytes initial size of the encapsulated array in bytes
+	 */
+	private void initialize(int bytes) {
+		int capacity = bytes / ELEMENT_LENGTH_IN_BYTES;
+
+		Preconditions.checkArgument(capacity > 0, "Requested array with zero capacity");
+		Preconditions.checkArgument(capacity <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+		data = new short[capacity];
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * If the size of the array is insufficient to hold the given capacity then
+	 * copy the array into a new, larger array.
+	 *
+	 * @param minCapacity minimum required number of elements
+	 */
+	private void ensureCapacity(int minCapacity) {
+		long currentCapacity = data.length;
+
+		if (minCapacity <= currentCapacity) {
+			return;
+		}
+
+		// increase capacity by at least ~50%
+		long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1));
+		int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity);
+
+		if (newCapacity < minCapacity) {
+			// throw exception as unbounded arrays are not expected to fill
+			throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+		}
+
+		data = Arrays.copyOf(data, newCapacity);
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder("[");
+		for (int idx = 0; idx < this.position; idx++) {
+			sb.append(data[idx]);
+			if (idx < position - 1) {
+				sb.append(",");
+			}
+		}
+		sb.append("]");
+
+		return sb.toString();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Iterable
+	// --------------------------------------------------------------------------------------------
+
+	private final ReadIterator iterator = new ReadIterator();
+
+	@Override
+	public Iterator<ShortValue> iterator() {
+		iterator.reset();
+		return iterator;
+	}
+
+	private class ReadIterator
+	implements Iterator<ShortValue> {
+		private ShortValue value = new ShortValue();
+
+		private int pos;
+
+		@Override
+		public boolean hasNext() {
+			return pos < position;
+		}
+
+		@Override
+		public ShortValue next() {
+			value.setValue(data[pos++]);
+			return value;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("remove");
+		}
+
+		public void reset() {
+			pos = 0;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// IOReadableWritable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(position);
+
+		for (int i = 0; i < position; i++) {
+			out.writeShort(data[i]);
+		}
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		position = in.readInt();
+		mark = 0;
+
+		ensureCapacity(position);
+
+		for (int i = 0; i < position; i++) {
+			data[i] = in.readShort();
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// NormalizableKey
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int getMaxNormalizedKeyLen() {
+		return hashValue.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public void copyNormalizedKey(MemorySegment target, int offset, int len) {
+		hash.reset();
+
+		hash.hash(position);
+		for (int i = 0; i < position; i++) {
+			hash.hash(data[i]);
+		}
+
+		hashValue.setValue(hash.hash());
+		hashValue.copyNormalizedKey(target, offset, len);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Comparable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int compareTo(ValueArray<ShortValue> o) {
+		ShortValueArray other = (ShortValueArray) o;
+
+		int min = Math.min(position, other.position);
+		for (int i = 0; i < min; i++) {
+			int cmp = Short.compare(data[i], other.data[i]);
+
+			if (cmp != 0) {
+				return cmp;
+			}
+		}
+
+		return Integer.compare(position, other.position);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Key
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		int hash = 0;
+
+		for (int i = 0; i < position; i++) {
+			hash = 31 * hash + data[i];
+		}
+
+		return hash;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ShortValueArray) {
+			ShortValueArray other = (ShortValueArray) obj;
+
+			if (position != other.position) {
+				return false;
+			}
+
+			for (int i = 0; i < position; i++) {
+				if (data[i] != other.data[i]) {
+					return false;
+				}
+			}
+
+			return true;
+		}
+
+		return false;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ResettableValue
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void setValue(ValueArray<ShortValue> value) {
+		value.copyTo(this);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// CopyableValue
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int getBinaryLength() {
+		return -1;
+	}
+
+	@Override
+	public void copyTo(ValueArray<ShortValue> target) {
+		ShortValueArray other = (ShortValueArray) target;
+
+		other.position = position;
+		other.mark = mark;
+
+		other.ensureCapacity(position);
+		System.arraycopy(data, 0, other.data, 0, position);
+	}
+
+	@Override
+	public ValueArray<ShortValue> copy() {
+		ValueArray<ShortValue> copy = new ShortValueArray();
+
+		this.copyTo(copy);
+
+		return copy;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		copyInternal(source, target);
+	}
+
+	protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException {
+		int count = source.readInt();
+		target.writeInt(count);
+
+		int bytes = ELEMENT_LENGTH_IN_BYTES * count;
+		target.write(source, bytes);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// ValueArray
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int size() {
+		return position;
+	}
+
+	@Override
+	public boolean isFull() {
+		if (isBounded) {
+			return position == data.length;
+		} else {
+			return position == MAX_ARRAY_SIZE;
+		}
+	}
+
+	@Override
+	public boolean add(ShortValue value) {
+		int newPosition = position + 1;
+
+		if (newPosition > data.length) {
+			if (isBounded) {
+				return false;
+			} else {
+				ensureCapacity(newPosition);
+			}
+		}
+
+		data[position] = value.getValue();
+		position = newPosition;
+
+		return true;
+	}
+
+	@Override
+	public boolean addAll(ValueArray<ShortValue> other) {
+		ShortValueArray source = (ShortValueArray) other;
+
+		int sourceSize = source.position;
+		int newPosition = position + sourceSize;
+
+		if (newPosition > data.length) {
+			if (isBounded) {
+				return false;
+			} else {
+				ensureCapacity(newPosition);
+			}
+		}
+
+		System.arraycopy(source.data, 0, data, position, sourceSize);
+		position = newPosition;
+
+		return true;
+	}
+
+	@Override
+	public void clear() {
+		position = 0;
+	}
+
+	@Override
+	public void mark() {
+		mark = position;
+	}
+
+	@Override
+	public void reset() {
+		position = mark;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparator.java
new file mode 100644
index 0000000..6ebbeaa
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for ShortValueArray based on CopyableValueComparator.
+ *
+ * <p>This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class ShortValueArrayComparator extends TypeComparator<ShortValueArray> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final boolean ascendingComparison;
+
+	private final ShortValueArray reference = new ShortValueArray();
+
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+	public ShortValueArrayComparator(boolean ascending) {
+		this.ascendingComparison = ascending;
+	}
+
+	@Override
+	public int hash(ShortValueArray record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(ShortValueArray toCompare) {
+		toCompare.copyTo(reference);
+	}
+
+	@Override
+	public boolean equalToReference(ShortValueArray candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<ShortValueArray> referencedComparator) {
+		int comp = ((ShortValueArrayComparator) referencedComparator).reference.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compare(ShortValueArray first, ShortValueArray second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		int firstCount = firstSource.readInt();
+		int secondCount = secondSource.readInt();
+
+		int minCount = Math.min(firstCount, secondCount);
+		while (minCount-- > 0) {
+			short firstValue = firstSource.readShort();
+			short secondValue = secondSource.readShort();
+
+			int cmp = Short.compare(firstValue, secondValue);
+			if (cmp != 0) {
+				return ascendingComparison ? cmp : -cmp;
+			}
+		}
+
+		int cmp = Integer.compare(firstCount, secondCount);
+		return ascendingComparison ? cmp : -cmp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(ShortValueArray.class);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return reference.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyShorts) {
+		return keyShorts < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(ShortValueArray record, MemorySegment target, int offset, int numShorts) {
+		record.copyNormalizedKey(target, offset, numShorts);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+
+	@Override
+	public TypeComparator<ShortValueArray> duplicate() {
+		return new ShortValueArrayComparator(ascendingComparison);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		return comparators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(ShortValueArray record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public ShortValueArray readWithKeyDenormalization(ShortValueArray reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
new file mode 100644
index 0000000..bce4d81
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code ShortValueArray}.
+ */
+public final class ShortValueArraySerializer extends TypeSerializerSingleton<ShortValueArray> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public ShortValueArray createInstance() {
+		return new ShortValueArray();
+	}
+
+	@Override
+	public ShortValueArray copy(ShortValueArray from) {
+		return copy(from, new ShortValueArray());
+	}
+
+	@Override
+	public ShortValueArray copy(ShortValueArray from, ShortValueArray reuse) {
+		reuse.setValue(from);
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(ShortValueArray record, DataOutputView target) throws IOException {
+		record.write(target);
+	}
+
+	@Override
+	public ShortValueArray deserialize(DataInputView source) throws IOException {
+		return deserialize(new ShortValueArray(), source);
+	}
+
+	@Override
+	public ShortValueArray deserialize(ShortValueArray reuse, DataInputView source) throws IOException {
+		reuse.read(source);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		ShortValueArray.copyInternal(source, target);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ShortValueArraySerializer;
+	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
index fabe990..0805a3d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
@@ -224,7 +224,8 @@ implements ValueArray<StringValue> {
 				valueData[i] = (char) c;
 			}
 
-			return value;
+			// cannot prevent allocation of new StringValue!
+			return value.substring(0, len);
 		}
 
 		@Override
@@ -317,7 +318,7 @@ implements ValueArray<StringValue> {
 
 	@Override
 	public int hashCode() {
-		int hash = 1;
+		int hash = 0;
 
 		for (int i = 0; i < position; i++) {
 			hash = 31 * hash + data[i];

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
index 2426550..577471a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
@@ -18,10 +18,15 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.CharValue;
 import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.types.ShortValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 
@@ -43,12 +48,22 @@ public class ValueArrayFactory {
 	 */
 	@SuppressWarnings("unchecked")
 	public static <T> ValueArray<T> createValueArray(Class<? extends Value> cls) {
-		if (IntValue.class.isAssignableFrom(cls)) {
+		if (ByteValue.class.isAssignableFrom(cls)) {
+			return (ValueArray<T>) new ByteValueArray();
+		} else if (CharValue.class.isAssignableFrom(cls)) {
+			return (ValueArray<T>) new CharValueArray();
+		} else if (DoubleValue.class.isAssignableFrom(cls)) {
+			return (ValueArray<T>) new DoubleValueArray();
+		} else if (FloatValue.class.isAssignableFrom(cls)) {
+			return (ValueArray<T>) new FloatValueArray();
+		} else if (IntValue.class.isAssignableFrom(cls)) {
 			return (ValueArray<T>) new IntValueArray();
 		} else if (LongValue.class.isAssignableFrom(cls)) {
 			return (ValueArray<T>) new LongValueArray();
 		} else if (NullValue.class.isAssignableFrom(cls)) {
 			return (ValueArray<T>) new NullValueArray();
+		} else if (ShortValue.class.isAssignableFrom(cls)) {
+			return (ValueArray<T>) new ShortValueArray();
 		} else if (StringValue.class.isAssignableFrom(cls)) {
 			return (ValueArray<T>) new StringValueArray();
 		} else {
@@ -66,12 +81,22 @@ public class ValueArrayFactory {
 	 */
 	@SuppressWarnings("unchecked")
 	public static <T> ValueArray<T> createValueArray(Class<? extends Value> cls, int bytes) {
-		if (IntValue.class.isAssignableFrom(cls)) {
+		if (ByteValue.class.isAssignableFrom(cls)) {
+			return (ValueArray<T>) new ByteValueArray(bytes);
+		} else if (CharValue.class.isAssignableFrom(cls)) {
+			return (ValueArray<T>) new CharValueArray(bytes);
+		} else if (DoubleValue.class.isAssignableFrom(cls)) {
+			return (ValueArray<T>) new DoubleValueArray(bytes);
+		} else if (FloatValue.class.isAssignableFrom(cls)) {
+			return (ValueArray<T>) new FloatValueArray(bytes);
+		} else if (IntValue.class.isAssignableFrom(cls)) {
 			return (ValueArray<T>) new IntValueArray(bytes);
 		} else if (LongValue.class.isAssignableFrom(cls)) {
 			return (ValueArray<T>) new LongValueArray(bytes);
 		} else if (NullValue.class.isAssignableFrom(cls)) {
 			return (ValueArray<T>) new NullValueArray(bytes);
+		} else if (ShortValue.class.isAssignableFrom(cls)) {
+			return (ValueArray<T>) new ShortValueArray(bytes);
 		} else if (StringValue.class.isAssignableFrom(cls)) {
 			return (ValueArray<T>) new StringValueArray(bytes);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
index 4ba8e39..7d3d3e1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
@@ -25,9 +25,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.CharValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.types.ShortValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Preconditions;
@@ -44,6 +49,7 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
 
 	private static final long serialVersionUID = 1L;
 
+	public static final ValueArrayTypeInfo<ByteValue> BYTE_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.BYTE_VALUE_TYPE_INFO);
 	public static final ValueArrayTypeInfo<IntValue> INT_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.INT_VALUE_TYPE_INFO);
 	public static final ValueArrayTypeInfo<LongValue> LONG_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.LONG_VALUE_TYPE_INFO);
 	public static final ValueArrayTypeInfo<NullValue> NULL_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.NULL_VALUE_TYPE_INFO);
@@ -96,12 +102,22 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
 	public TypeSerializer<ValueArray<T>> createSerializer(ExecutionConfig executionConfig) {
 		Preconditions.checkNotNull(type, "TypeInformation type class is required");
 
-		if (IntValue.class.isAssignableFrom(type)) {
+		if (ByteValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new ByteValueArraySerializer();
+		} else if (CharValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new CharValueArraySerializer();
+		} else if (DoubleValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new DoubleValueArraySerializer();
+		} else if (FloatValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new FloatValueArraySerializer();
+		} else if (IntValue.class.isAssignableFrom(type)) {
 			return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new IntValueArraySerializer();
 		} else if (LongValue.class.isAssignableFrom(type)) {
 			return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new LongValueArraySerializer();
 		} else if (NullValue.class.isAssignableFrom(type)) {
 			return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new NullValueArraySerializer();
+		} else if (ShortValue.class.isAssignableFrom(type)) {
+			return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new ShortValueArraySerializer();
 		} else if (StringValue.class.isAssignableFrom(type)) {
 			return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new StringValueArraySerializer();
 		} else {
@@ -114,12 +130,22 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
 	public TypeComparator<ValueArray<T>> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
 		Preconditions.checkNotNull(type, "TypeInformation type class is required");
 
-		if (IntValue.class.isAssignableFrom(type)) {
+		if (ByteValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new ByteValueArrayComparator(sortOrderAscending);
+		} else if (CharValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new CharValueArrayComparator(sortOrderAscending);
+		} else if (DoubleValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new DoubleValueArrayComparator(sortOrderAscending);
+		} else if (FloatValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new FloatValueArrayComparator(sortOrderAscending);
+		} else if (IntValue.class.isAssignableFrom(type)) {
 			return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new IntValueArrayComparator(sortOrderAscending);
 		} else if (LongValue.class.isAssignableFrom(type)) {
 			return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new LongValueArrayComparator(sortOrderAscending);
 		} else if (NullValue.class.isAssignableFrom(type)) {
 			return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new NullValueArrayComparator(sortOrderAscending);
+		} else if (ShortValue.class.isAssignableFrom(type)) {
+			return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new ShortValueArrayComparator(sortOrderAscending);
 		} else if (StringValue.class.isAssignableFrom(type)) {
 			return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new StringValueArrayComparator(sortOrderAscending);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayComparatorTest.java
new file mode 100644
index 0000000..2baf61c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayComparatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ByteValue;
+
+/**
+ * Tests for {@link ByteValueArrayComparator}.
+ */
+public class ByteValueArrayComparatorTest extends ComparatorTestBase<ByteValueArray> {
+
+	@Override
+	protected TypeComparator<ByteValueArray> createComparator(boolean ascending) {
+		return new ByteValueArrayComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<ByteValueArray> createSerializer() {
+		return new ByteValueArraySerializer();
+	}
+
+	@Override
+	protected ByteValueArray[] getSortedTestData() {
+		ByteValueArray lva0 = new ByteValueArray();
+
+		ByteValueArray lva1 = new ByteValueArray();
+		lva1.add(new ByteValue((byte) 5));
+
+		ByteValueArray lva2 = new ByteValueArray();
+		lva2.add(new ByteValue((byte) 5));
+		lva2.add(new ByteValue((byte) 10));
+
+		return new ByteValueArray[]{ lva0, lva1 };
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializerTest.java
new file mode 100644
index 0000000..4a29318
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ByteValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link ByteValueArraySerializer}.
+ */
+public class ByteValueArraySerializerTest extends SerializerTestBase<ByteValueArray> {
+
+	@Override
+	protected TypeSerializer<ByteValueArray> createSerializer() {
+		return new ByteValueArraySerializer();
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<ByteValueArray> getTypeClass() {
+		return ByteValueArray.class;
+	}
+
+	@Override
+	protected ByteValueArray[] getTestData() {
+		int defaultElements = ByteValueArray.DEFAULT_CAPACITY_IN_BYTES / ByteValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+		Random rnd = new Random(874597969123412341L);
+		int rndLong = rnd.nextInt();
+
+		ByteValueArray lva0 = new ByteValueArray();
+
+		ByteValueArray lva1 = new ByteValueArray();
+		lva1.addAll(lva0);
+		lva1.add(new ByteValue((byte) 0));
+
+		ByteValueArray lva2 = new ByteValueArray();
+		lva2.addAll(lva1);
+		lva2.add(new ByteValue((byte) 1));
+
+		ByteValueArray lva3 = new ByteValueArray();
+		lva3.addAll(lva2);
+		lva3.add(new ByteValue((byte) -1));
+
+		ByteValueArray lva4 = new ByteValueArray();
+		lva4.addAll(lva3);
+		lva4.add(new ByteValue(Byte.MAX_VALUE));
+
+		ByteValueArray lva5 = new ByteValueArray();
+		lva5.addAll(lva4);
+		lva5.add(new ByteValue(Byte.MIN_VALUE));
+
+		ByteValueArray lva6 = new ByteValueArray();
+		lva6.addAll(lva5);
+		lva6.add(new ByteValue((byte) rndLong));
+
+		ByteValueArray lva7 = new ByteValueArray();
+		lva7.addAll(lva6);
+		lva7.add(new ByteValue((byte) -rndLong));
+
+		ByteValueArray lva8 = new ByteValueArray();
+		lva8.addAll(lva7);
+		for (int i = 0; i < 1.5 * defaultElements; i++) {
+			lva8.add(new ByteValue((byte) i));
+		}
+		lva8.addAll(lva8);
+
+		return new ByteValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayTest.java
new file mode 100644
index 0000000..438abec
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.ByteValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ByteValueArray}.
+ */
+public class ByteValueArrayTest {
+
+	@Test
+	public void testBoundedArray() {
+		int count = ByteValueArray.DEFAULT_CAPACITY_IN_BYTES / ByteValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+		ValueArray<ByteValue> lva = new ByteValueArray(ByteValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+		// fill the array
+		for (int i = 0; i < count; i++) {
+			assertFalse(lva.isFull());
+			assertEquals(i, lva.size());
+
+			assertTrue(lva.add(new ByteValue((byte) i)));
+
+			assertEquals(i + 1, lva.size());
+		}
+
+		// array is now full
+		assertTrue(lva.isFull());
+		assertEquals(count, lva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (ByteValue lv : lva) {
+			assertEquals((byte) idx++, lv.getValue());
+		}
+
+		// add element past end of array
+		assertFalse(lva.add(new ByteValue((byte) count)));
+		assertFalse(lva.addAll(lva));
+
+		// test copy
+		assertEquals(lva, lva.copy());
+
+		// test copyTo
+		ByteValueArray lvaTo = new ByteValueArray();
+		lva.copyTo(lvaTo);
+		assertEquals(lva, lvaTo);
+
+		// test clear
+		lva.clear();
+		assertEquals(0, lva.size());
+	}
+
+	@Test
+	public void testUnboundedArray() {
+		int count = 4096;
+
+		ValueArray<ByteValue> lva = new ByteValueArray();
+
+		// add several elements
+		for (int i = 0; i < count; i++) {
+			assertFalse(lva.isFull());
+			assertEquals(i, lva.size());
+
+			assertTrue(lva.add(new ByteValue((byte) i)));
+
+			assertEquals(i + 1, lva.size());
+		}
+
+		// array never fills
+		assertFalse(lva.isFull());
+		assertEquals(count, lva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (ByteValue lv : lva) {
+			assertEquals((byte) idx++, lv.getValue());
+		}
+
+		// add element past end of array
+		assertTrue(lva.add(new ByteValue((byte) count)));
+		assertTrue(lva.addAll(lva));
+
+		// test copy
+		assertEquals(lva, lva.copy());
+
+		// test copyTo
+		ByteValueArray lvaTo = new ByteValueArray();
+		lva.copyTo(lvaTo);
+		assertEquals(lva, lvaTo);
+
+		// test mark/reset
+		int size = lva.size();
+		lva.mark();
+		assertTrue(lva.add(new ByteValue()));
+		assertEquals(size + 1, lva.size());
+		lva.reset();
+		assertEquals(size, lva.size());
+
+		// test clear
+		lva.clear();
+		assertEquals(0, lva.size());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayComparatorTest.java
new file mode 100644
index 0000000..8f06e70
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayComparatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.CharValue;
+
+/**
+ * Tests for {@link CharValueArrayComparator}.
+ */
+public class CharValueArrayComparatorTest extends ComparatorTestBase<CharValueArray> {
+
+	@Override
+	protected TypeComparator<CharValueArray> createComparator(boolean ascending) {
+		return new CharValueArrayComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<CharValueArray> createSerializer() {
+		return new CharValueArraySerializer();
+	}
+
+	@Override
+	protected CharValueArray[] getSortedTestData() {
+		CharValueArray lva0 = new CharValueArray();
+
+		CharValueArray lva1 = new CharValueArray();
+		lva1.add(new CharValue((char) 5));
+
+		CharValueArray lva2 = new CharValueArray();
+		lva2.add(new CharValue((char) 5));
+		lva2.add(new CharValue((char) 10));
+
+		return new CharValueArray[]{ lva0, lva1 };
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializerTest.java
new file mode 100644
index 0000000..86ee10d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.CharValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link CharValueArraySerializer}.
+ */
+public class CharValueArraySerializerTest extends SerializerTestBase<CharValueArray> {
+
+	@Override
+	protected TypeSerializer<CharValueArray> createSerializer() {
+		return new CharValueArraySerializer();
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<CharValueArray> getTypeClass() {
+		return CharValueArray.class;
+	}
+
+	@Override
+	protected CharValueArray[] getTestData() {
+		int defaultElements = CharValueArray.DEFAULT_CAPACITY_IN_BYTES / CharValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+		Random rnd = new Random(874597969123412341L);
+		int rndLong = rnd.nextInt();
+
+		CharValueArray lva0 = new CharValueArray();
+
+		CharValueArray lva1 = new CharValueArray();
+		lva1.addAll(lva0);
+		lva1.add(new CharValue((char) 0));
+
+		CharValueArray lva2 = new CharValueArray();
+		lva2.addAll(lva1);
+		lva2.add(new CharValue((char) 1));
+
+		CharValueArray lva3 = new CharValueArray();
+		lva3.addAll(lva2);
+		lva3.add(new CharValue((char) -1));
+
+		CharValueArray lva4 = new CharValueArray();
+		lva4.addAll(lva3);
+		lva4.add(new CharValue(Character.MAX_VALUE));
+
+		CharValueArray lva5 = new CharValueArray();
+		lva5.addAll(lva4);
+		lva5.add(new CharValue(Character.MIN_VALUE));
+
+		CharValueArray lva6 = new CharValueArray();
+		lva6.addAll(lva5);
+		lva6.add(new CharValue((char) rndLong));
+
+		CharValueArray lva7 = new CharValueArray();
+		lva7.addAll(lva6);
+		lva7.add(new CharValue((char) -rndLong));
+
+		CharValueArray lva8 = new CharValueArray();
+		lva8.addAll(lva7);
+		for (int i = 0; i < 1.5 * defaultElements; i++) {
+			lva8.add(new CharValue((char) i));
+		}
+		lva8.addAll(lva8);
+
+		return new CharValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayTest.java
new file mode 100644
index 0000000..3205d42
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.StringValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link StringValueArray}.
+ */
+public class CharValueArrayTest {
+
+	@Test
+	public void testBoundedArray() {
+		// one byte for length and one byte for character
+		int count = StringValueArray.DEFAULT_CAPACITY_IN_BYTES / 2;
+
+		ValueArray<StringValue> sva = new StringValueArray(StringValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+		// fill the array
+		for (int i = 0; i < count; i++) {
+			assertFalse(sva.isFull());
+			assertEquals(i, sva.size());
+
+			assertTrue(sva.add(new StringValue(Character.toString((char) (i & 0x7F)))));
+
+			assertEquals(i + 1, sva.size());
+		}
+
+		// array is now full
+		assertTrue(sva.isFull());
+		assertEquals(count, sva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (StringValue sv : sva) {
+			assertEquals((idx++) & 0x7F, sv.getValue().charAt(0));
+		}
+
+		// add element past end of array
+		assertFalse(sva.add(new StringValue(String.valueOf((char) count))));
+		assertFalse(sva.addAll(sva));
+
+		// test copy
+		assertEquals(sva, sva.copy());
+
+		// test copyTo
+		StringValueArray svaTo = new StringValueArray();
+		sva.copyTo(svaTo);
+		assertEquals(sva, svaTo);
+
+		// test clear
+		sva.clear();
+		assertEquals(0, sva.size());
+	}
+
+	@Test
+	public void testBoundedArrayWithVariableLengthCharacters() {
+		// characters alternatingly take 1 and 2 bytes (plus one byte for length)
+		int count = 1280;
+
+		ValueArray<StringValue> sva = new StringValueArray(3200);
+
+		// fill the array
+		for (int i = 0; i < count; i++) {
+			assertFalse(sva.isFull());
+			assertEquals(i, sva.size());
+
+			assertTrue(sva.add(new StringValue(Character.toString((char) (i & 0xFF)))));
+
+			assertEquals(i + 1, sva.size());
+		}
+
+		// array is now full
+		assertTrue(sva.isFull());
+		assertEquals(count, sva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (StringValue sv : sva) {
+			assertEquals((idx++) & 0xFF, sv.getValue().charAt(0));
+		}
+
+		// add element past end of array
+		assertFalse(sva.add(new StringValue(String.valueOf((char) count))));
+		assertFalse(sva.addAll(sva));
+
+		// test copy
+		assertEquals(sva, sva.copy());
+
+		// test copyTo
+		StringValueArray svaTo = new StringValueArray();
+		sva.copyTo(svaTo);
+		assertEquals(sva, svaTo);
+
+		// test clear
+		sva.clear();
+		assertEquals(0, sva.size());
+	}
+
+	@Test
+	public void testUnboundedArray() {
+		int count = 4096;
+
+		ValueArray<StringValue> sva = new StringValueArray();
+
+		// add several elements
+		for (int i = 0; i < count; i++) {
+			assertFalse(sva.isFull());
+			assertEquals(i, sva.size());
+
+			assertTrue(sva.add(new StringValue(String.valueOf((char) i))));
+
+			assertEquals(i + 1, sva.size());
+		}
+
+		// array never fills
+		assertFalse(sva.isFull());
+		assertEquals(count, sva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (StringValue sv : sva) {
+			assertEquals(idx++, sv.getValue().charAt(0));
+		}
+
+		// add element past end of array
+		assertTrue(sva.add(new StringValue(String.valueOf((char) count))));
+		assertTrue(sva.addAll(sva));
+
+		// test copy
+		assertEquals(sva, sva.copy());
+
+		// test copyTo
+		StringValueArray svaTo = new StringValueArray();
+		sva.copyTo(svaTo);
+		assertEquals(sva, svaTo);
+
+		// test mark/reset
+		int size = sva.size();
+		sva.mark();
+		assertTrue(sva.add(new StringValue()));
+		assertEquals(size + 1, sva.size());
+		sva.reset();
+		assertEquals(size, sva.size());
+
+		// test clear
+		sva.clear();
+		assertEquals(0, sva.size());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayComparatorTest.java
new file mode 100644
index 0000000..29ae4c9
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayComparatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.DoubleValue;
+
+/**
+ * Tests for {@link DoubleValueArrayComparator}.
+ */
+public class DoubleValueArrayComparatorTest extends ComparatorTestBase<DoubleValueArray> {
+
+	@Override
+	protected TypeComparator<DoubleValueArray> createComparator(boolean ascending) {
+		return new DoubleValueArrayComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<DoubleValueArray> createSerializer() {
+		return new DoubleValueArraySerializer();
+	}
+
+	@Override
+	protected DoubleValueArray[] getSortedTestData() {
+		DoubleValueArray lva0 = new DoubleValueArray();
+
+		DoubleValueArray lva1 = new DoubleValueArray();
+		lva1.add(new DoubleValue(5));
+
+		DoubleValueArray lva2 = new DoubleValueArray();
+		lva2.add(new DoubleValue(5));
+		lva2.add(new DoubleValue(10));
+
+		return new DoubleValueArray[]{ lva0, lva1 };
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializerTest.java
new file mode 100644
index 0000000..49c1f65
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.DoubleValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link DoubleValueArraySerializer}.
+ */
+public class DoubleValueArraySerializerTest extends SerializerTestBase<DoubleValueArray> {
+
+	@Override
+	protected TypeSerializer<DoubleValueArray> createSerializer() {
+		return new DoubleValueArraySerializer();
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<DoubleValueArray> getTypeClass() {
+		return DoubleValueArray.class;
+	}
+
+	@Override
+	protected DoubleValueArray[] getTestData() {
+		int defaultElements = DoubleValueArray.DEFAULT_CAPACITY_IN_BYTES / DoubleValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+		Random rnd = new Random(874597969123412341L);
+		int rndLong = rnd.nextInt();
+
+		DoubleValueArray lva0 = new DoubleValueArray();
+
+		DoubleValueArray lva1 = new DoubleValueArray();
+		lva1.addAll(lva0);
+		lva1.add(new DoubleValue(0));
+
+		DoubleValueArray lva2 = new DoubleValueArray();
+		lva2.addAll(lva1);
+		lva2.add(new DoubleValue(1));
+
+		DoubleValueArray lva3 = new DoubleValueArray();
+		lva3.addAll(lva2);
+		lva3.add(new DoubleValue(-1));
+
+		DoubleValueArray lva4 = new DoubleValueArray();
+		lva4.addAll(lva3);
+		lva4.add(new DoubleValue(Double.MAX_VALUE));
+
+		DoubleValueArray lva5 = new DoubleValueArray();
+		lva5.addAll(lva4);
+		lva5.add(new DoubleValue(Double.MIN_VALUE));
+
+		DoubleValueArray lva6 = new DoubleValueArray();
+		lva6.addAll(lva5);
+		lva6.add(new DoubleValue(rndLong));
+
+		DoubleValueArray lva7 = new DoubleValueArray();
+		lva7.addAll(lva6);
+		lva7.add(new DoubleValue(-rndLong));
+
+		DoubleValueArray lva8 = new DoubleValueArray();
+		lva8.addAll(lva7);
+		for (int i = 0; i < 1.5 * defaultElements; i++) {
+			lva8.add(new DoubleValue(i));
+		}
+		lva8.addAll(lva8);
+
+		return new DoubleValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayTest.java
new file mode 100644
index 0000000..e545ba1
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.DoubleValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link DoubleValueArray}.
+ */
+public class DoubleValueArrayTest {
+
+	@Test
+	public void testBoundedArray() {
+		int count = DoubleValueArray.DEFAULT_CAPACITY_IN_BYTES / DoubleValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+		ValueArray<DoubleValue> lva = new DoubleValueArray(DoubleValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+		// fill the array
+		for (int i = 0; i < count; i++) {
+			assertFalse(lva.isFull());
+			assertEquals(i, lva.size());
+
+			assertTrue(lva.add(new DoubleValue(i)));
+
+			assertEquals(i + 1, lva.size());
+		}
+
+		// array is now full
+		assertTrue(lva.isFull());
+		assertEquals(count, lva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (DoubleValue lv : lva) {
+			assertEquals(idx++, lv.getValue(), 0.000001);
+		}
+
+		// add element past end of array
+		assertFalse(lva.add(new DoubleValue(count)));
+		assertFalse(lva.addAll(lva));
+
+		// test copy
+		assertEquals(lva, lva.copy());
+
+		// test copyTo
+		DoubleValueArray lvaTo = new DoubleValueArray();
+		lva.copyTo(lvaTo);
+		assertEquals(lva, lvaTo);
+
+		// test clear
+		lva.clear();
+		assertEquals(0, lva.size());
+	}
+
+	@Test
+	public void testUnboundedArray() {
+		int count = 4096;
+
+		ValueArray<DoubleValue> lva = new DoubleValueArray();
+
+		// add several elements
+		for (int i = 0; i < count; i++) {
+			assertFalse(lva.isFull());
+			assertEquals(i, lva.size());
+
+			assertTrue(lva.add(new DoubleValue(i)));
+
+			assertEquals(i + 1, lva.size());
+		}
+
+		// array never fills
+		assertFalse(lva.isFull());
+		assertEquals(count, lva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (DoubleValue lv : lva) {
+			assertEquals(idx++, lv.getValue(), 0.000001);
+		}
+
+		// add element past end of array
+		assertTrue(lva.add(new DoubleValue(count)));
+		assertTrue(lva.addAll(lva));
+
+		// test copy
+		assertEquals(lva, lva.copy());
+
+		// test copyTo
+		DoubleValueArray lvaTo = new DoubleValueArray();
+		lva.copyTo(lvaTo);
+		assertEquals(lva, lvaTo);
+
+		// test mark/reset
+		int size = lva.size();
+		lva.mark();
+		assertTrue(lva.add(new DoubleValue()));
+		assertEquals(size + 1, lva.size());
+		lva.reset();
+		assertEquals(size, lva.size());
+
+		// test clear
+		lva.clear();
+		assertEquals(0, lva.size());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayComparatorTest.java
new file mode 100644
index 0000000..9651072
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayComparatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.FloatValue;
+
+/**
+ * Tests for {@link FloatValueArrayComparator}.
+ */
+public class FloatValueArrayComparatorTest extends ComparatorTestBase<FloatValueArray> {
+
+	@Override
+	protected TypeComparator<FloatValueArray> createComparator(boolean ascending) {
+		return new FloatValueArrayComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<FloatValueArray> createSerializer() {
+		return new FloatValueArraySerializer();
+	}
+
+	@Override
+	protected FloatValueArray[] getSortedTestData() {
+		FloatValueArray lva0 = new FloatValueArray();
+
+		FloatValueArray lva1 = new FloatValueArray();
+		lva1.add(new FloatValue(5));
+
+		FloatValueArray lva2 = new FloatValueArray();
+		lva2.add(new FloatValue(5));
+		lva2.add(new FloatValue(10));
+
+		return new FloatValueArray[]{ lva0, lva1 };
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializerTest.java
new file mode 100644
index 0000000..14312c3
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.FloatValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link FloatValueArraySerializer}.
+ */
+public class FloatValueArraySerializerTest extends SerializerTestBase<FloatValueArray> {
+
+	@Override
+	protected TypeSerializer<FloatValueArray> createSerializer() {
+		return new FloatValueArraySerializer();
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<FloatValueArray> getTypeClass() {
+		return FloatValueArray.class;
+	}
+
+	@Override
+	protected FloatValueArray[] getTestData() {
+		int defaultElements = FloatValueArray.DEFAULT_CAPACITY_IN_BYTES / FloatValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+		Random rnd = new Random(874597969123412341L);
+		int rndLong = rnd.nextInt();
+
+		FloatValueArray lva0 = new FloatValueArray();
+
+		FloatValueArray lva1 = new FloatValueArray();
+		lva1.addAll(lva0);
+		lva1.add(new FloatValue(0));
+
+		FloatValueArray lva2 = new FloatValueArray();
+		lva2.addAll(lva1);
+		lva2.add(new FloatValue(1));
+
+		FloatValueArray lva3 = new FloatValueArray();
+		lva3.addAll(lva2);
+		lva3.add(new FloatValue(-1));
+
+		FloatValueArray lva4 = new FloatValueArray();
+		lva4.addAll(lva3);
+		lva4.add(new FloatValue(Float.MAX_VALUE));
+
+		FloatValueArray lva5 = new FloatValueArray();
+		lva5.addAll(lva4);
+		lva5.add(new FloatValue(Float.MIN_VALUE));
+
+		FloatValueArray lva6 = new FloatValueArray();
+		lva6.addAll(lva5);
+		lva6.add(new FloatValue(rndLong));
+
+		FloatValueArray lva7 = new FloatValueArray();
+		lva7.addAll(lva6);
+		lva7.add(new FloatValue(-rndLong));
+
+		FloatValueArray lva8 = new FloatValueArray();
+		lva8.addAll(lva7);
+		for (int i = 0; i < 1.5 * defaultElements; i++) {
+			lva8.add(new FloatValue(i));
+		}
+		lva8.addAll(lva8);
+
+		return new FloatValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayTest.java
new file mode 100644
index 0000000..5bab761
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.FloatValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link FloatValueArray}.
+ */
+public class FloatValueArrayTest {
+
+	@Test
+	public void testBoundedArray() {
+		int count = FloatValueArray.DEFAULT_CAPACITY_IN_BYTES / FloatValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+		ValueArray<FloatValue> lva = new FloatValueArray(FloatValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+		// fill the array
+		for (int i = 0; i < count; i++) {
+			assertFalse(lva.isFull());
+			assertEquals(i, lva.size());
+
+			assertTrue(lva.add(new FloatValue((byte) i)));
+
+			assertEquals(i + 1, lva.size());
+		}
+
+		// array is now full
+		assertTrue(lva.isFull());
+		assertEquals(count, lva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (FloatValue lv : lva) {
+			assertEquals((byte) idx++, lv.getValue(), 0.000001);
+		}
+
+		// add element past end of array
+		assertFalse(lva.add(new FloatValue((byte) count)));
+		assertFalse(lva.addAll(lva));
+
+		// test copy
+		assertEquals(lva, lva.copy());
+
+		// test copyTo
+		FloatValueArray lvaTo = new FloatValueArray();
+		lva.copyTo(lvaTo);
+		assertEquals(lva, lvaTo);
+
+		// test clear
+		lva.clear();
+		assertEquals(0, lva.size());
+	}
+
+	@Test
+	public void testUnboundedArray() {
+		int count = 4096;
+
+		ValueArray<FloatValue> lva = new FloatValueArray();
+
+		// add several elements
+		for (int i = 0; i < count; i++) {
+			assertFalse(lva.isFull());
+			assertEquals(i, lva.size());
+
+			assertTrue(lva.add(new FloatValue((byte) i)));
+
+			assertEquals(i + 1, lva.size());
+		}
+
+		// array never fills
+		assertFalse(lva.isFull());
+		assertEquals(count, lva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (FloatValue lv : lva) {
+			assertEquals((byte) idx++, lv.getValue(), 0.000001);
+		}
+
+		// add element past end of array
+		assertTrue(lva.add(new FloatValue((byte) count)));
+		assertTrue(lva.addAll(lva));
+
+		// test copy
+		assertEquals(lva, lva.copy());
+
+		// test copyTo
+		FloatValueArray lvaTo = new FloatValueArray();
+		lva.copyTo(lvaTo);
+		assertEquals(lva, lvaTo);
+
+		// test mark/reset
+		int size = lva.size();
+		lva.mark();
+		assertTrue(lva.add(new FloatValue()));
+		assertEquals(size + 1, lva.size());
+		lva.reset();
+		assertEquals(size, lva.size());
+
+		// test clear
+		lva.clear();
+		assertEquals(0, lva.size());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
index e76b840..8774249 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
@@ -46,8 +46,8 @@ public class LongValueArrayComparatorTest extends ComparatorTestBase<LongValueAr
 		lva1.add(new LongValue(5));
 
 		LongValueArray lva2 = new LongValueArray();
-		lva2.add(new LongValue(50));
-		lva2.add(new LongValue(100));
+		lva2.add(new LongValue(5));
+		lva2.add(new LongValue(10));
 
 		return new LongValueArray[]{ lva0, lva1, lva2 };
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparatorTest.java
new file mode 100644
index 0000000..1ceab4b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ShortValue;
+
+/**
+ * Tests for {@link ShortValueArrayComparator}.
+ */
+public class ShortValueArrayComparatorTest extends ComparatorTestBase<ShortValueArray> {
+
+	@Override
+	protected TypeComparator<ShortValueArray> createComparator(boolean ascending) {
+		return new ShortValueArrayComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<ShortValueArray> createSerializer() {
+		return new ShortValueArraySerializer();
+	}
+
+	@Override
+	protected ShortValueArray[] getSortedTestData() {
+		ShortValueArray lva0 = new ShortValueArray();
+
+		ShortValueArray lva1 = new ShortValueArray();
+		lva1.add(new ShortValue((short) 5));
+
+		ShortValueArray lva2 = new ShortValueArray();
+		lva2.add(new ShortValue((short) 5));
+		lva2.add(new ShortValue((short) 10));
+
+		return new ShortValueArray[]{ lva0, lva1 };
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializerTest.java
new file mode 100644
index 0000000..005f4d1
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ShortValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link ShortValueArraySerializer}.
+ */
+public class ShortValueArraySerializerTest extends SerializerTestBase<ShortValueArray> {
+
+	@Override
+	protected TypeSerializer<ShortValueArray> createSerializer() {
+		return new ShortValueArraySerializer();
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<ShortValueArray> getTypeClass() {
+		return ShortValueArray.class;
+	}
+
+	@Override
+	protected ShortValueArray[] getTestData() {
+		int defaultElements = ShortValueArray.DEFAULT_CAPACITY_IN_BYTES / ShortValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+		Random rnd = new Random(874597969123412341L);
+		int rndLong = rnd.nextInt();
+
+		ShortValueArray lva0 = new ShortValueArray();
+
+		ShortValueArray lva1 = new ShortValueArray();
+		lva1.addAll(lva0);
+		lva1.add(new ShortValue((short) 0));
+
+		ShortValueArray lva2 = new ShortValueArray();
+		lva2.addAll(lva1);
+		lva2.add(new ShortValue((short) 1));
+
+		ShortValueArray lva3 = new ShortValueArray();
+		lva3.addAll(lva2);
+		lva3.add(new ShortValue((short) -1));
+
+		ShortValueArray lva4 = new ShortValueArray();
+		lva4.addAll(lva3);
+		lva4.add(new ShortValue(Short.MAX_VALUE));
+
+		ShortValueArray lva5 = new ShortValueArray();
+		lva5.addAll(lva4);
+		lva5.add(new ShortValue(Short.MIN_VALUE));
+
+		ShortValueArray lva6 = new ShortValueArray();
+		lva6.addAll(lva5);
+		lva6.add(new ShortValue((short) rndLong));
+
+		ShortValueArray lva7 = new ShortValueArray();
+		lva7.addAll(lva6);
+		lva7.add(new ShortValue((short) -rndLong));
+
+		ShortValueArray lva8 = new ShortValueArray();
+		lva8.addAll(lva7);
+		for (int i = 0; i < 1.5 * defaultElements; i++) {
+			lva8.add(new ShortValue((short) i));
+		}
+		lva8.addAll(lva8);
+
+		return new ShortValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayTest.java
new file mode 100644
index 0000000..eed1a28
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.ShortValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ShortValueArray}.
+ */
+public class ShortValueArrayTest {
+
+	@Test
+	public void testBoundedArray() {
+		int count = ShortValueArray.DEFAULT_CAPACITY_IN_BYTES / ShortValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+		ValueArray<ShortValue> lva = new ShortValueArray(ShortValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+		// fill the array
+		for (int i = 0; i < count; i++) {
+			assertFalse(lva.isFull());
+			assertEquals(i, lva.size());
+
+			assertTrue(lva.add(new ShortValue((short) i)));
+
+			assertEquals(i + 1, lva.size());
+		}
+
+		// array is now full
+		assertTrue(lva.isFull());
+		assertEquals(count, lva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (ShortValue lv : lva) {
+			assertEquals((short) idx++, lv.getValue());
+		}
+
+		// add element past end of array
+		assertFalse(lva.add(new ShortValue((short) count)));
+		assertFalse(lva.addAll(lva));
+
+		// test copy
+		assertEquals(lva, lva.copy());
+
+		// test copyTo
+		ShortValueArray lvaTo = new ShortValueArray();
+		lva.copyTo(lvaTo);
+		assertEquals(lva, lvaTo);
+
+		// test clear
+		lva.clear();
+		assertEquals(0, lva.size());
+	}
+
+	@Test
+	public void testUnboundedArray() {
+		int count = 4096;
+
+		ValueArray<ShortValue> lva = new ShortValueArray();
+
+		// add several elements
+		for (int i = 0; i < count; i++) {
+			assertFalse(lva.isFull());
+			assertEquals(i, lva.size());
+
+			assertTrue(lva.add(new ShortValue((short) i)));
+
+			assertEquals(i + 1, lva.size());
+		}
+
+		// array never fills
+		assertFalse(lva.isFull());
+		assertEquals(count, lva.size());
+
+		// verify the array values
+		int idx = 0;
+		for (ShortValue lv : lva) {
+			assertEquals((short) idx++, lv.getValue());
+		}
+
+		// add element past end of array
+		assertTrue(lva.add(new ShortValue((short) count)));
+		assertTrue(lva.addAll(lva));
+
+		// test copy
+		assertEquals(lva, lva.copy());
+
+		// test copyTo
+		ShortValueArray lvaTo = new ShortValueArray();
+		lva.copyTo(lvaTo);
+		assertEquals(lva, lvaTo);
+
+		// test mark/reset
+		int size = lva.size();
+		lva.mark();
+		assertTrue(lva.add(new ShortValue()));
+		assertEquals(size + 1, lva.size());
+		lva.reset();
+		assertEquals(size, lva.size());
+
+		// test clear
+		lva.clear();
+		assertEquals(0, lva.size());
+	}
+}