You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/05 20:53:08 UTC
[3/6] flink git commit: [FLINK-7023] [gelly] Remaining types for
Gelly ValueArrays
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArray.java
new file mode 100644
index 0000000..aa99989
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArray.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.ShortValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link ShortValue}.
+ */
+public class ShortValueArray
+implements ValueArray<ShortValue> {
+
+ protected static final int ELEMENT_LENGTH_IN_BYTES = 2;
+
+ protected static final int DEFAULT_CAPACITY_IN_BYTES = 1024;
+
+ // see note in ArrayList, HashTable, ...
+ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+ private boolean isBounded;
+
+ private short[] data;
+
+ // the number of elements currently stored
+ private int position;
+
+ // location of the bookmark used by mark() and reset()
+ private transient int mark;
+
+ // hasher used to generate the normalized key
+ private MurmurHash hash = new MurmurHash(0xb3148e81);
+
+ // hash result stored as normalized key
+ private IntValue hashValue = new IntValue();
+
+ /**
+ * Initializes an expandable array with default capacity.
+ */
+ public ShortValueArray() {
+ isBounded = false;
+ initialize(DEFAULT_CAPACITY_IN_BYTES);
+ }
+
+ /**
+ * Initializes a fixed-size array with the provided number of shorts.
+ *
+ * @param bytes number of bytes of the encapsulated array
+ */
+ public ShortValueArray(int bytes) {
+ isBounded = true;
+ initialize(bytes);
+ }
+
+ /**
+ * Initializes the array with the provided number of bytes.
+ *
+ * @param bytes initial size of the encapsulated array in bytes
+ */
+ private void initialize(int bytes) {
+ int capacity = bytes / ELEMENT_LENGTH_IN_BYTES;
+
+ Preconditions.checkArgument(capacity > 0, "Requested array with zero capacity");
+ Preconditions.checkArgument(capacity <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+ data = new short[capacity];
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * If the size of the array is insufficient to hold the given capacity then
+ * copy the array into a new, larger array.
+ *
+ * @param minCapacity minimum required number of elements
+ */
+ private void ensureCapacity(int minCapacity) {
+ long currentCapacity = data.length;
+
+ if (minCapacity <= currentCapacity) {
+ return;
+ }
+
+ // increase capacity by at least ~50%
+ long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1));
+ int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity);
+
+ if (newCapacity < minCapacity) {
+ // throw exception as unbounded arrays are not expected to fill
+ throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+ }
+
+ data = Arrays.copyOf(data, newCapacity);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("[");
+ for (int idx = 0; idx < this.position; idx++) {
+ sb.append(data[idx]);
+ if (idx < position - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append("]");
+
+ return sb.toString();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Iterable
+ // --------------------------------------------------------------------------------------------
+
+ private final ReadIterator iterator = new ReadIterator();
+
+ @Override
+ public Iterator<ShortValue> iterator() {
+ iterator.reset();
+ return iterator;
+ }
+
+ private class ReadIterator
+ implements Iterator<ShortValue> {
+ private ShortValue value = new ShortValue();
+
+ private int pos;
+
+ @Override
+ public boolean hasNext() {
+ return pos < position;
+ }
+
+ @Override
+ public ShortValue next() {
+ value.setValue(data[pos++]);
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+
+ public void reset() {
+ pos = 0;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // IOReadableWritable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(position);
+
+ for (int i = 0; i < position; i++) {
+ out.writeShort(data[i]);
+ }
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ position = in.readInt();
+ mark = 0;
+
+ ensureCapacity(position);
+
+ for (int i = 0; i < position; i++) {
+ data[i] = in.readShort();
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // NormalizableKey
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getMaxNormalizedKeyLen() {
+ return hashValue.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public void copyNormalizedKey(MemorySegment target, int offset, int len) {
+ hash.reset();
+
+ hash.hash(position);
+ for (int i = 0; i < position; i++) {
+ hash.hash(data[i]);
+ }
+
+ hashValue.setValue(hash.hash());
+ hashValue.copyNormalizedKey(target, offset, len);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Comparable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int compareTo(ValueArray<ShortValue> o) {
+ ShortValueArray other = (ShortValueArray) o;
+
+ int min = Math.min(position, other.position);
+ for (int i = 0; i < min; i++) {
+ int cmp = Short.compare(data[i], other.data[i]);
+
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return Integer.compare(position, other.position);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Key
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ int hash = 0;
+
+ for (int i = 0; i < position; i++) {
+ hash = 31 * hash + data[i];
+ }
+
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ShortValueArray) {
+ ShortValueArray other = (ShortValueArray) obj;
+
+ if (position != other.position) {
+ return false;
+ }
+
+ for (int i = 0; i < position; i++) {
+ if (data[i] != other.data[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ResettableValue
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void setValue(ValueArray<ShortValue> value) {
+ value.copyTo(this);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // CopyableValue
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int getBinaryLength() {
+ return -1;
+ }
+
+ @Override
+ public void copyTo(ValueArray<ShortValue> target) {
+ ShortValueArray other = (ShortValueArray) target;
+
+ other.position = position;
+ other.mark = mark;
+
+ other.ensureCapacity(position);
+ System.arraycopy(data, 0, other.data, 0, position);
+ }
+
+ @Override
+ public ValueArray<ShortValue> copy() {
+ ValueArray<ShortValue> copy = new ShortValueArray();
+
+ this.copyTo(copy);
+
+ return copy;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ copyInternal(source, target);
+ }
+
+ protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException {
+ int count = source.readInt();
+ target.writeInt(count);
+
+ int bytes = ELEMENT_LENGTH_IN_BYTES * count;
+ target.write(source, bytes);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ValueArray
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int size() {
+ return position;
+ }
+
+ @Override
+ public boolean isFull() {
+ if (isBounded) {
+ return position == data.length;
+ } else {
+ return position == MAX_ARRAY_SIZE;
+ }
+ }
+
+ @Override
+ public boolean add(ShortValue value) {
+ int newPosition = position + 1;
+
+ if (newPosition > data.length) {
+ if (isBounded) {
+ return false;
+ } else {
+ ensureCapacity(newPosition);
+ }
+ }
+
+ data[position] = value.getValue();
+ position = newPosition;
+
+ return true;
+ }
+
+ @Override
+ public boolean addAll(ValueArray<ShortValue> other) {
+ ShortValueArray source = (ShortValueArray) other;
+
+ int sourceSize = source.position;
+ int newPosition = position + sourceSize;
+
+ if (newPosition > data.length) {
+ if (isBounded) {
+ return false;
+ } else {
+ ensureCapacity(newPosition);
+ }
+ }
+
+ System.arraycopy(source.data, 0, data, position, sourceSize);
+ position = newPosition;
+
+ return true;
+ }
+
+ @Override
+ public void clear() {
+ position = 0;
+ }
+
+ @Override
+ public void mark() {
+ mark = position;
+ }
+
+ @Override
+ public void reset() {
+ position = mark;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparator.java
new file mode 100644
index 0000000..6ebbeaa
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for ShortValueArray based on CopyableValueComparator.
+ *
+ * <p>This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class ShortValueArrayComparator extends TypeComparator<ShortValueArray> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ascendingComparison;
+
+ private final ShortValueArray reference = new ShortValueArray();
+
+ private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
+
+ public ShortValueArrayComparator(boolean ascending) {
+ this.ascendingComparison = ascending;
+ }
+
+ @Override
+ public int hash(ShortValueArray record) {
+ return record.hashCode();
+ }
+
+ @Override
+ public void setReference(ShortValueArray toCompare) {
+ toCompare.copyTo(reference);
+ }
+
+ @Override
+ public boolean equalToReference(ShortValueArray candidate) {
+ return candidate.equals(this.reference);
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<ShortValueArray> referencedComparator) {
+ int comp = ((ShortValueArrayComparator) referencedComparator).reference.compareTo(reference);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compare(ShortValueArray first, ShortValueArray second) {
+ int comp = first.compareTo(second);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ int firstCount = firstSource.readInt();
+ int secondCount = secondSource.readInt();
+
+ int minCount = Math.min(firstCount, secondCount);
+ while (minCount-- > 0) {
+ short firstValue = firstSource.readShort();
+ short secondValue = secondSource.readShort();
+
+ int cmp = Short.compare(firstValue, secondValue);
+ if (cmp != 0) {
+ return ascendingComparison ? cmp : -cmp;
+ }
+ }
+
+ int cmp = Integer.compare(firstCount, secondCount);
+ return ascendingComparison ? cmp : -cmp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return NormalizableKey.class.isAssignableFrom(ShortValueArray.class);
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return reference.getMaxNormalizedKeyLen();
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyShorts) {
+ return keyShorts < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(ShortValueArray record, MemorySegment target, int offset, int numShorts) {
+ record.copyNormalizedKey(target, offset, numShorts);
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascendingComparison;
+ }
+
+ @Override
+ public TypeComparator<ShortValueArray> duplicate() {
+ return new ShortValueArrayComparator(ascendingComparison);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator<?>[] getFlatComparators() {
+ return comparators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // unsupported normalization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(ShortValueArray record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ShortValueArray readWithKeyDenormalization(ShortValueArray reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
new file mode 100644
index 0000000..bce4d81
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code ShortValueArray}.
+ */
+public final class ShortValueArraySerializer extends TypeSerializerSingleton<ShortValueArray> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public ShortValueArray createInstance() {
+ return new ShortValueArray();
+ }
+
+ @Override
+ public ShortValueArray copy(ShortValueArray from) {
+ return copy(from, new ShortValueArray());
+ }
+
+ @Override
+ public ShortValueArray copy(ShortValueArray from, ShortValueArray reuse) {
+ reuse.setValue(from);
+ return reuse;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(ShortValueArray record, DataOutputView target) throws IOException {
+ record.write(target);
+ }
+
+ @Override
+ public ShortValueArray deserialize(DataInputView source) throws IOException {
+ return deserialize(new ShortValueArray(), source);
+ }
+
+ @Override
+ public ShortValueArray deserialize(ShortValueArray reuse, DataInputView source) throws IOException {
+ reuse.read(source);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ ShortValueArray.copyInternal(source, target);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof ShortValueArraySerializer;
+ }
+
+ @Override
+ protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+ return super.isCompatibleSerializationFormatIdentifier(identifier)
+ || identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
index fabe990..0805a3d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
@@ -224,7 +224,8 @@ implements ValueArray<StringValue> {
valueData[i] = (char) c;
}
- return value;
+ // cannot prevent allocation of new StringValue!
+ return value.substring(0, len);
}
@Override
@@ -317,7 +318,7 @@ implements ValueArray<StringValue> {
@Override
public int hashCode() {
- int hash = 1;
+ int hash = 0;
for (int i = 0; i < position; i++) {
hash = 31 * hash + data[i];
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
index 2426550..577471a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
@@ -18,10 +18,15 @@
package org.apache.flink.graph.types.valuearray;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.CharValue;
import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
@@ -43,12 +48,22 @@ public class ValueArrayFactory {
*/
@SuppressWarnings("unchecked")
public static <T> ValueArray<T> createValueArray(Class<? extends Value> cls) {
- if (IntValue.class.isAssignableFrom(cls)) {
+ if (ByteValue.class.isAssignableFrom(cls)) {
+ return (ValueArray<T>) new ByteValueArray();
+ } else if (CharValue.class.isAssignableFrom(cls)) {
+ return (ValueArray<T>) new CharValueArray();
+ } else if (DoubleValue.class.isAssignableFrom(cls)) {
+ return (ValueArray<T>) new DoubleValueArray();
+ } else if (FloatValue.class.isAssignableFrom(cls)) {
+ return (ValueArray<T>) new FloatValueArray();
+ } else if (IntValue.class.isAssignableFrom(cls)) {
return (ValueArray<T>) new IntValueArray();
} else if (LongValue.class.isAssignableFrom(cls)) {
return (ValueArray<T>) new LongValueArray();
} else if (NullValue.class.isAssignableFrom(cls)) {
return (ValueArray<T>) new NullValueArray();
+ } else if (ShortValue.class.isAssignableFrom(cls)) {
+ return (ValueArray<T>) new ShortValueArray();
} else if (StringValue.class.isAssignableFrom(cls)) {
return (ValueArray<T>) new StringValueArray();
} else {
@@ -66,12 +81,22 @@ public class ValueArrayFactory {
*/
@SuppressWarnings("unchecked")
public static <T> ValueArray<T> createValueArray(Class<? extends Value> cls, int bytes) {
- if (IntValue.class.isAssignableFrom(cls)) {
+ if (ByteValue.class.isAssignableFrom(cls)) {
+ return (ValueArray<T>) new ByteValueArray(bytes);
+ } else if (CharValue.class.isAssignableFrom(cls)) {
+ return (ValueArray<T>) new CharValueArray(bytes);
+ } else if (DoubleValue.class.isAssignableFrom(cls)) {
+ return (ValueArray<T>) new DoubleValueArray(bytes);
+ } else if (FloatValue.class.isAssignableFrom(cls)) {
+ return (ValueArray<T>) new FloatValueArray(bytes);
+ } else if (IntValue.class.isAssignableFrom(cls)) {
return (ValueArray<T>) new IntValueArray(bytes);
} else if (LongValue.class.isAssignableFrom(cls)) {
return (ValueArray<T>) new LongValueArray(bytes);
} else if (NullValue.class.isAssignableFrom(cls)) {
return (ValueArray<T>) new NullValueArray(bytes);
+ } else if (ShortValue.class.isAssignableFrom(cls)) {
+ return (ValueArray<T>) new ShortValueArray(bytes);
} else if (StringValue.class.isAssignableFrom(cls)) {
return (ValueArray<T>) new StringValueArray(bytes);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
index 4ba8e39..7d3d3e1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
@@ -25,9 +25,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.CharValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Preconditions;
@@ -44,6 +49,7 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
private static final long serialVersionUID = 1L;
+ public static final ValueArrayTypeInfo<ByteValue> BYTE_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.BYTE_VALUE_TYPE_INFO);
public static final ValueArrayTypeInfo<IntValue> INT_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.INT_VALUE_TYPE_INFO);
public static final ValueArrayTypeInfo<LongValue> LONG_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.LONG_VALUE_TYPE_INFO);
public static final ValueArrayTypeInfo<NullValue> NULL_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.NULL_VALUE_TYPE_INFO);
@@ -96,12 +102,22 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
public TypeSerializer<ValueArray<T>> createSerializer(ExecutionConfig executionConfig) {
Preconditions.checkNotNull(type, "TypeInformation type class is required");
- if (IntValue.class.isAssignableFrom(type)) {
+ if (ByteValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new ByteValueArraySerializer();
+ } else if (CharValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new CharValueArraySerializer();
+ } else if (DoubleValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new DoubleValueArraySerializer();
+ } else if (FloatValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new FloatValueArraySerializer();
+ } else if (IntValue.class.isAssignableFrom(type)) {
return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new IntValueArraySerializer();
} else if (LongValue.class.isAssignableFrom(type)) {
return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new LongValueArraySerializer();
} else if (NullValue.class.isAssignableFrom(type)) {
return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new NullValueArraySerializer();
+ } else if (ShortValue.class.isAssignableFrom(type)) {
+ return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new ShortValueArraySerializer();
} else if (StringValue.class.isAssignableFrom(type)) {
return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new StringValueArraySerializer();
} else {
@@ -114,12 +130,22 @@ public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implem
public TypeComparator<ValueArray<T>> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
Preconditions.checkNotNull(type, "TypeInformation type class is required");
- if (IntValue.class.isAssignableFrom(type)) {
+ if (ByteValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new ByteValueArrayComparator(sortOrderAscending);
+ } else if (CharValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new CharValueArrayComparator(sortOrderAscending);
+ } else if (DoubleValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new DoubleValueArrayComparator(sortOrderAscending);
+ } else if (FloatValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new FloatValueArrayComparator(sortOrderAscending);
+ } else if (IntValue.class.isAssignableFrom(type)) {
return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new IntValueArrayComparator(sortOrderAscending);
} else if (LongValue.class.isAssignableFrom(type)) {
return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new LongValueArrayComparator(sortOrderAscending);
} else if (NullValue.class.isAssignableFrom(type)) {
return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new NullValueArrayComparator(sortOrderAscending);
+ } else if (ShortValue.class.isAssignableFrom(type)) {
+ return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new ShortValueArrayComparator(sortOrderAscending);
} else if (StringValue.class.isAssignableFrom(type)) {
return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new StringValueArrayComparator(sortOrderAscending);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayComparatorTest.java
new file mode 100644
index 0000000..2baf61c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayComparatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ByteValue;
+
+/**
+ * Tests for {@link ByteValueArrayComparator}.
+ */
+public class ByteValueArrayComparatorTest extends ComparatorTestBase<ByteValueArray> {
+
+ @Override
+ protected TypeComparator<ByteValueArray> createComparator(boolean ascending) {
+ return new ByteValueArrayComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<ByteValueArray> createSerializer() {
+ return new ByteValueArraySerializer();
+ }
+
+ @Override
+ protected ByteValueArray[] getSortedTestData() {
+ ByteValueArray lva0 = new ByteValueArray();
+
+ ByteValueArray lva1 = new ByteValueArray();
+ lva1.add(new ByteValue((byte) 5));
+
+ ByteValueArray lva2 = new ByteValueArray();
+ lva2.add(new ByteValue((byte) 5));
+ lva2.add(new ByteValue((byte) 10));
+
+ return new ByteValueArray[]{ lva0, lva1 };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializerTest.java
new file mode 100644
index 0000000..4a29318
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ByteValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link ByteValueArraySerializer}.
+ */
+public class ByteValueArraySerializerTest extends SerializerTestBase<ByteValueArray> {
+
+ @Override
+ protected TypeSerializer<ByteValueArray> createSerializer() {
+ return new ByteValueArraySerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<ByteValueArray> getTypeClass() {
+ return ByteValueArray.class;
+ }
+
+ @Override
+ protected ByteValueArray[] getTestData() {
+ int defaultElements = ByteValueArray.DEFAULT_CAPACITY_IN_BYTES / ByteValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+ Random rnd = new Random(874597969123412341L);
+ int rndLong = rnd.nextInt();
+
+ ByteValueArray lva0 = new ByteValueArray();
+
+ ByteValueArray lva1 = new ByteValueArray();
+ lva1.addAll(lva0);
+ lva1.add(new ByteValue((byte) 0));
+
+ ByteValueArray lva2 = new ByteValueArray();
+ lva2.addAll(lva1);
+ lva2.add(new ByteValue((byte) 1));
+
+ ByteValueArray lva3 = new ByteValueArray();
+ lva3.addAll(lva2);
+ lva3.add(new ByteValue((byte) -1));
+
+ ByteValueArray lva4 = new ByteValueArray();
+ lva4.addAll(lva3);
+ lva4.add(new ByteValue(Byte.MAX_VALUE));
+
+ ByteValueArray lva5 = new ByteValueArray();
+ lva5.addAll(lva4);
+ lva5.add(new ByteValue(Byte.MIN_VALUE));
+
+ ByteValueArray lva6 = new ByteValueArray();
+ lva6.addAll(lva5);
+ lva6.add(new ByteValue((byte) rndLong));
+
+ ByteValueArray lva7 = new ByteValueArray();
+ lva7.addAll(lva6);
+ lva7.add(new ByteValue((byte) -rndLong));
+
+ ByteValueArray lva8 = new ByteValueArray();
+ lva8.addAll(lva7);
+ for (int i = 0; i < 1.5 * defaultElements; i++) {
+ lva8.add(new ByteValue((byte) i));
+ }
+ lva8.addAll(lva8);
+
+ return new ByteValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayTest.java
new file mode 100644
index 0000000..438abec
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ByteValueArrayTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.ByteValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ByteValueArray}.
+ */
+public class ByteValueArrayTest {
+
+ @Test
+ public void testBoundedArray() {
+ int count = ByteValueArray.DEFAULT_CAPACITY_IN_BYTES / ByteValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+ ValueArray<ByteValue> lva = new ByteValueArray(ByteValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+ // fill the array
+ for (int i = 0; i < count; i++) {
+ assertFalse(lva.isFull());
+ assertEquals(i, lva.size());
+
+ assertTrue(lva.add(new ByteValue((byte) i)));
+
+ assertEquals(i + 1, lva.size());
+ }
+
+ // array is now full
+ assertTrue(lva.isFull());
+ assertEquals(count, lva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (ByteValue lv : lva) {
+ assertEquals((byte) idx++, lv.getValue());
+ }
+
+ // add element past end of array
+ assertFalse(lva.add(new ByteValue((byte) count)));
+ assertFalse(lva.addAll(lva));
+
+ // test copy
+ assertEquals(lva, lva.copy());
+
+ // test copyTo
+ ByteValueArray lvaTo = new ByteValueArray();
+ lva.copyTo(lvaTo);
+ assertEquals(lva, lvaTo);
+
+ // test clear
+ lva.clear();
+ assertEquals(0, lva.size());
+ }
+
+ @Test
+ public void testUnboundedArray() {
+ int count = 4096;
+
+ ValueArray<ByteValue> lva = new ByteValueArray();
+
+ // add several elements
+ for (int i = 0; i < count; i++) {
+ assertFalse(lva.isFull());
+ assertEquals(i, lva.size());
+
+ assertTrue(lva.add(new ByteValue((byte) i)));
+
+ assertEquals(i + 1, lva.size());
+ }
+
+ // array never fills
+ assertFalse(lva.isFull());
+ assertEquals(count, lva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (ByteValue lv : lva) {
+ assertEquals((byte) idx++, lv.getValue());
+ }
+
+ // add element past end of array
+ assertTrue(lva.add(new ByteValue((byte) count)));
+ assertTrue(lva.addAll(lva));
+
+ // test copy
+ assertEquals(lva, lva.copy());
+
+ // test copyTo
+ ByteValueArray lvaTo = new ByteValueArray();
+ lva.copyTo(lvaTo);
+ assertEquals(lva, lvaTo);
+
+ // test mark/reset
+ int size = lva.size();
+ lva.mark();
+ assertTrue(lva.add(new ByteValue()));
+ assertEquals(size + 1, lva.size());
+ lva.reset();
+ assertEquals(size, lva.size());
+
+ // test clear
+ lva.clear();
+ assertEquals(0, lva.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayComparatorTest.java
new file mode 100644
index 0000000..8f06e70
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayComparatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.CharValue;
+
+/**
+ * Tests for {@link CharValueArrayComparator}.
+ */
+public class CharValueArrayComparatorTest extends ComparatorTestBase<CharValueArray> {
+
+ @Override
+ protected TypeComparator<CharValueArray> createComparator(boolean ascending) {
+ return new CharValueArrayComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<CharValueArray> createSerializer() {
+ return new CharValueArraySerializer();
+ }
+
+ @Override
+ protected CharValueArray[] getSortedTestData() {
+ CharValueArray lva0 = new CharValueArray();
+
+ CharValueArray lva1 = new CharValueArray();
+ lva1.add(new CharValue((char) 5));
+
+ CharValueArray lva2 = new CharValueArray();
+ lva2.add(new CharValue((char) 5));
+ lva2.add(new CharValue((char) 10));
+
+ return new CharValueArray[]{ lva0, lva1 };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializerTest.java
new file mode 100644
index 0000000..86ee10d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.CharValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link CharValueArraySerializer}.
+ */
+public class CharValueArraySerializerTest extends SerializerTestBase<CharValueArray> {
+
+ @Override
+ protected TypeSerializer<CharValueArray> createSerializer() {
+ return new CharValueArraySerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<CharValueArray> getTypeClass() {
+ return CharValueArray.class;
+ }
+
+ @Override
+ protected CharValueArray[] getTestData() {
+ int defaultElements = CharValueArray.DEFAULT_CAPACITY_IN_BYTES / CharValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+ Random rnd = new Random(874597969123412341L);
+ int rndLong = rnd.nextInt();
+
+ CharValueArray lva0 = new CharValueArray();
+
+ CharValueArray lva1 = new CharValueArray();
+ lva1.addAll(lva0);
+ lva1.add(new CharValue((char) 0));
+
+ CharValueArray lva2 = new CharValueArray();
+ lva2.addAll(lva1);
+ lva2.add(new CharValue((char) 1));
+
+ CharValueArray lva3 = new CharValueArray();
+ lva3.addAll(lva2);
+ lva3.add(new CharValue((char) -1));
+
+ CharValueArray lva4 = new CharValueArray();
+ lva4.addAll(lva3);
+ lva4.add(new CharValue(Character.MAX_VALUE));
+
+ CharValueArray lva5 = new CharValueArray();
+ lva5.addAll(lva4);
+ lva5.add(new CharValue(Character.MIN_VALUE));
+
+ CharValueArray lva6 = new CharValueArray();
+ lva6.addAll(lva5);
+ lva6.add(new CharValue((char) rndLong));
+
+ CharValueArray lva7 = new CharValueArray();
+ lva7.addAll(lva6);
+ lva7.add(new CharValue((char) -rndLong));
+
+ CharValueArray lva8 = new CharValueArray();
+ lva8.addAll(lva7);
+ for (int i = 0; i < 1.5 * defaultElements; i++) {
+ lva8.add(new CharValue((char) i));
+ }
+ lva8.addAll(lva8);
+
+ return new CharValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayTest.java
new file mode 100644
index 0000000..3205d42
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/CharValueArrayTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.StringValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link StringValueArray}.
+ */
+public class CharValueArrayTest {
+
+ @Test
+ public void testBoundedArray() {
+ // one byte for length and one byte for character
+ int count = StringValueArray.DEFAULT_CAPACITY_IN_BYTES / 2;
+
+ ValueArray<StringValue> sva = new StringValueArray(StringValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+ // fill the array
+ for (int i = 0; i < count; i++) {
+ assertFalse(sva.isFull());
+ assertEquals(i, sva.size());
+
+ assertTrue(sva.add(new StringValue(Character.toString((char) (i & 0x7F)))));
+
+ assertEquals(i + 1, sva.size());
+ }
+
+ // array is now full
+ assertTrue(sva.isFull());
+ assertEquals(count, sva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (StringValue sv : sva) {
+ assertEquals((idx++) & 0x7F, sv.getValue().charAt(0));
+ }
+
+ // add element past end of array
+ assertFalse(sva.add(new StringValue(String.valueOf((char) count))));
+ assertFalse(sva.addAll(sva));
+
+ // test copy
+ assertEquals(sva, sva.copy());
+
+ // test copyTo
+ StringValueArray svaTo = new StringValueArray();
+ sva.copyTo(svaTo);
+ assertEquals(sva, svaTo);
+
+ // test clear
+ sva.clear();
+ assertEquals(0, sva.size());
+ }
+
+ @Test
+ public void testBoundedArrayWithVariableLengthCharacters() {
+ // characters alternatingly take 1 and 2 bytes (plus one byte for length)
+ int count = 1280;
+
+ ValueArray<StringValue> sva = new StringValueArray(3200);
+
+ // fill the array
+ for (int i = 0; i < count; i++) {
+ assertFalse(sva.isFull());
+ assertEquals(i, sva.size());
+
+ assertTrue(sva.add(new StringValue(Character.toString((char) (i & 0xFF)))));
+
+ assertEquals(i + 1, sva.size());
+ }
+
+ // array is now full
+ assertTrue(sva.isFull());
+ assertEquals(count, sva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (StringValue sv : sva) {
+ assertEquals((idx++) & 0xFF, sv.getValue().charAt(0));
+ }
+
+ // add element past end of array
+ assertFalse(sva.add(new StringValue(String.valueOf((char) count))));
+ assertFalse(sva.addAll(sva));
+
+ // test copy
+ assertEquals(sva, sva.copy());
+
+ // test copyTo
+ StringValueArray svaTo = new StringValueArray();
+ sva.copyTo(svaTo);
+ assertEquals(sva, svaTo);
+
+ // test clear
+ sva.clear();
+ assertEquals(0, sva.size());
+ }
+
+ @Test
+ public void testUnboundedArray() {
+ int count = 4096;
+
+ ValueArray<StringValue> sva = new StringValueArray();
+
+ // add several elements
+ for (int i = 0; i < count; i++) {
+ assertFalse(sva.isFull());
+ assertEquals(i, sva.size());
+
+ assertTrue(sva.add(new StringValue(String.valueOf((char) i))));
+
+ assertEquals(i + 1, sva.size());
+ }
+
+ // array never fills
+ assertFalse(sva.isFull());
+ assertEquals(count, sva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (StringValue sv : sva) {
+ assertEquals(idx++, sv.getValue().charAt(0));
+ }
+
+ // add element past end of array
+ assertTrue(sva.add(new StringValue(String.valueOf((char) count))));
+ assertTrue(sva.addAll(sva));
+
+ // test copy
+ assertEquals(sva, sva.copy());
+
+ // test copyTo
+ StringValueArray svaTo = new StringValueArray();
+ sva.copyTo(svaTo);
+ assertEquals(sva, svaTo);
+
+ // test mark/reset
+ int size = sva.size();
+ sva.mark();
+ assertTrue(sva.add(new StringValue()));
+ assertEquals(size + 1, sva.size());
+ sva.reset();
+ assertEquals(size, sva.size());
+
+ // test clear
+ sva.clear();
+ assertEquals(0, sva.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayComparatorTest.java
new file mode 100644
index 0000000..29ae4c9
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayComparatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.DoubleValue;
+
+/**
+ * Tests for {@link DoubleValueArrayComparator}.
+ */
+public class DoubleValueArrayComparatorTest extends ComparatorTestBase<DoubleValueArray> {
+
+ @Override
+ protected TypeComparator<DoubleValueArray> createComparator(boolean ascending) {
+ return new DoubleValueArrayComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<DoubleValueArray> createSerializer() {
+ return new DoubleValueArraySerializer();
+ }
+
+ @Override
+ protected DoubleValueArray[] getSortedTestData() {
+ DoubleValueArray lva0 = new DoubleValueArray();
+
+ DoubleValueArray lva1 = new DoubleValueArray();
+ lva1.add(new DoubleValue(5));
+
+ DoubleValueArray lva2 = new DoubleValueArray();
+ lva2.add(new DoubleValue(5));
+ lva2.add(new DoubleValue(10));
+
+ return new DoubleValueArray[]{ lva0, lva1 };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializerTest.java
new file mode 100644
index 0000000..49c1f65
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.DoubleValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link DoubleValueArraySerializer}.
+ */
+public class DoubleValueArraySerializerTest extends SerializerTestBase<DoubleValueArray> {
+
+ @Override
+ protected TypeSerializer<DoubleValueArray> createSerializer() {
+ return new DoubleValueArraySerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<DoubleValueArray> getTypeClass() {
+ return DoubleValueArray.class;
+ }
+
+ @Override
+ protected DoubleValueArray[] getTestData() {
+ int defaultElements = DoubleValueArray.DEFAULT_CAPACITY_IN_BYTES / DoubleValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+ Random rnd = new Random(874597969123412341L);
+ int rndLong = rnd.nextInt();
+
+ DoubleValueArray lva0 = new DoubleValueArray();
+
+ DoubleValueArray lva1 = new DoubleValueArray();
+ lva1.addAll(lva0);
+ lva1.add(new DoubleValue(0));
+
+ DoubleValueArray lva2 = new DoubleValueArray();
+ lva2.addAll(lva1);
+ lva2.add(new DoubleValue(1));
+
+ DoubleValueArray lva3 = new DoubleValueArray();
+ lva3.addAll(lva2);
+ lva3.add(new DoubleValue(-1));
+
+ DoubleValueArray lva4 = new DoubleValueArray();
+ lva4.addAll(lva3);
+ lva4.add(new DoubleValue(Double.MAX_VALUE));
+
+ DoubleValueArray lva5 = new DoubleValueArray();
+ lva5.addAll(lva4);
+ lva5.add(new DoubleValue(Double.MIN_VALUE));
+
+ DoubleValueArray lva6 = new DoubleValueArray();
+ lva6.addAll(lva5);
+ lva6.add(new DoubleValue(rndLong));
+
+ DoubleValueArray lva7 = new DoubleValueArray();
+ lva7.addAll(lva6);
+ lva7.add(new DoubleValue(-rndLong));
+
+ DoubleValueArray lva8 = new DoubleValueArray();
+ lva8.addAll(lva7);
+ for (int i = 0; i < 1.5 * defaultElements; i++) {
+ lva8.add(new DoubleValue(i));
+ }
+ lva8.addAll(lva8);
+
+ return new DoubleValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayTest.java
new file mode 100644
index 0000000..e545ba1
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/DoubleValueArrayTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.DoubleValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link DoubleValueArray}.
+ */
+public class DoubleValueArrayTest {
+
+ @Test
+ public void testBoundedArray() {
+ int count = DoubleValueArray.DEFAULT_CAPACITY_IN_BYTES / DoubleValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+ ValueArray<DoubleValue> lva = new DoubleValueArray(DoubleValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+ // fill the array
+ for (int i = 0; i < count; i++) {
+ assertFalse(lva.isFull());
+ assertEquals(i, lva.size());
+
+ assertTrue(lva.add(new DoubleValue(i)));
+
+ assertEquals(i + 1, lva.size());
+ }
+
+ // array is now full
+ assertTrue(lva.isFull());
+ assertEquals(count, lva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (DoubleValue lv : lva) {
+ assertEquals(idx++, lv.getValue(), 0.000001);
+ }
+
+ // add element past end of array
+ assertFalse(lva.add(new DoubleValue(count)));
+ assertFalse(lva.addAll(lva));
+
+ // test copy
+ assertEquals(lva, lva.copy());
+
+ // test copyTo
+ DoubleValueArray lvaTo = new DoubleValueArray();
+ lva.copyTo(lvaTo);
+ assertEquals(lva, lvaTo);
+
+ // test clear
+ lva.clear();
+ assertEquals(0, lva.size());
+ }
+
+ @Test
+ public void testUnboundedArray() {
+ int count = 4096;
+
+ ValueArray<DoubleValue> lva = new DoubleValueArray();
+
+ // add several elements
+ for (int i = 0; i < count; i++) {
+ assertFalse(lva.isFull());
+ assertEquals(i, lva.size());
+
+ assertTrue(lva.add(new DoubleValue(i)));
+
+ assertEquals(i + 1, lva.size());
+ }
+
+ // array never fills
+ assertFalse(lva.isFull());
+ assertEquals(count, lva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (DoubleValue lv : lva) {
+ assertEquals(idx++, lv.getValue(), 0.000001);
+ }
+
+ // add element past end of array
+ assertTrue(lva.add(new DoubleValue(count)));
+ assertTrue(lva.addAll(lva));
+
+ // test copy
+ assertEquals(lva, lva.copy());
+
+ // test copyTo
+ DoubleValueArray lvaTo = new DoubleValueArray();
+ lva.copyTo(lvaTo);
+ assertEquals(lva, lvaTo);
+
+ // test mark/reset
+ int size = lva.size();
+ lva.mark();
+ assertTrue(lva.add(new DoubleValue()));
+ assertEquals(size + 1, lva.size());
+ lva.reset();
+ assertEquals(size, lva.size());
+
+ // test clear
+ lva.clear();
+ assertEquals(0, lva.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayComparatorTest.java
new file mode 100644
index 0000000..9651072
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayComparatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.FloatValue;
+
+/**
+ * Tests for {@link FloatValueArrayComparator}.
+ */
+public class FloatValueArrayComparatorTest extends ComparatorTestBase<FloatValueArray> {
+
+ @Override
+ protected TypeComparator<FloatValueArray> createComparator(boolean ascending) {
+ return new FloatValueArrayComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<FloatValueArray> createSerializer() {
+ return new FloatValueArraySerializer();
+ }
+
+ @Override
+ protected FloatValueArray[] getSortedTestData() {
+ FloatValueArray lva0 = new FloatValueArray();
+
+ FloatValueArray lva1 = new FloatValueArray();
+ lva1.add(new FloatValue(5));
+
+ FloatValueArray lva2 = new FloatValueArray();
+ lva2.add(new FloatValue(5));
+ lva2.add(new FloatValue(10));
+
+ return new FloatValueArray[]{ lva0, lva1 };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializerTest.java
new file mode 100644
index 0000000..14312c3
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.FloatValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link FloatValueArraySerializer}.
+ */
+public class FloatValueArraySerializerTest extends SerializerTestBase<FloatValueArray> {
+
+ @Override
+ protected TypeSerializer<FloatValueArray> createSerializer() {
+ return new FloatValueArraySerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<FloatValueArray> getTypeClass() {
+ return FloatValueArray.class;
+ }
+
+ @Override
+ protected FloatValueArray[] getTestData() {
+ int defaultElements = FloatValueArray.DEFAULT_CAPACITY_IN_BYTES / FloatValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+ Random rnd = new Random(874597969123412341L);
+ int rndLong = rnd.nextInt();
+
+ FloatValueArray lva0 = new FloatValueArray();
+
+ FloatValueArray lva1 = new FloatValueArray();
+ lva1.addAll(lva0);
+ lva1.add(new FloatValue(0));
+
+ FloatValueArray lva2 = new FloatValueArray();
+ lva2.addAll(lva1);
+ lva2.add(new FloatValue(1));
+
+ FloatValueArray lva3 = new FloatValueArray();
+ lva3.addAll(lva2);
+ lva3.add(new FloatValue(-1));
+
+ FloatValueArray lva4 = new FloatValueArray();
+ lva4.addAll(lva3);
+ lva4.add(new FloatValue(Float.MAX_VALUE));
+
+ FloatValueArray lva5 = new FloatValueArray();
+ lva5.addAll(lva4);
+ lva5.add(new FloatValue(Float.MIN_VALUE));
+
+ FloatValueArray lva6 = new FloatValueArray();
+ lva6.addAll(lva5);
+ lva6.add(new FloatValue(rndLong));
+
+ FloatValueArray lva7 = new FloatValueArray();
+ lva7.addAll(lva6);
+ lva7.add(new FloatValue(-rndLong));
+
+ FloatValueArray lva8 = new FloatValueArray();
+ lva8.addAll(lva7);
+ for (int i = 0; i < 1.5 * defaultElements; i++) {
+ lva8.add(new FloatValue(i));
+ }
+ lva8.addAll(lva8);
+
+ return new FloatValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayTest.java
new file mode 100644
index 0000000..5bab761
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/FloatValueArrayTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.FloatValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link FloatValueArray}.
+ */
+public class FloatValueArrayTest {
+
+ @Test
+ public void testBoundedArray() {
+ int count = FloatValueArray.DEFAULT_CAPACITY_IN_BYTES / FloatValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+ ValueArray<FloatValue> lva = new FloatValueArray(FloatValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+ // fill the array
+ for (int i = 0; i < count; i++) {
+ assertFalse(lva.isFull());
+ assertEquals(i, lva.size());
+
+ assertTrue(lva.add(new FloatValue((byte) i)));
+
+ assertEquals(i + 1, lva.size());
+ }
+
+ // array is now full
+ assertTrue(lva.isFull());
+ assertEquals(count, lva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (FloatValue lv : lva) {
+ assertEquals((byte) idx++, lv.getValue(), 0.000001);
+ }
+
+ // add element past end of array
+ assertFalse(lva.add(new FloatValue((byte) count)));
+ assertFalse(lva.addAll(lva));
+
+ // test copy
+ assertEquals(lva, lva.copy());
+
+ // test copyTo
+ FloatValueArray lvaTo = new FloatValueArray();
+ lva.copyTo(lvaTo);
+ assertEquals(lva, lvaTo);
+
+ // test clear
+ lva.clear();
+ assertEquals(0, lva.size());
+ }
+
+ @Test
+ public void testUnboundedArray() {
+ int count = 4096;
+
+ ValueArray<FloatValue> lva = new FloatValueArray();
+
+ // add several elements
+ for (int i = 0; i < count; i++) {
+ assertFalse(lva.isFull());
+ assertEquals(i, lva.size());
+
+ assertTrue(lva.add(new FloatValue((byte) i)));
+
+ assertEquals(i + 1, lva.size());
+ }
+
+ // array never fills
+ assertFalse(lva.isFull());
+ assertEquals(count, lva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (FloatValue lv : lva) {
+ assertEquals((byte) idx++, lv.getValue(), 0.000001);
+ }
+
+ // add element past end of array
+ assertTrue(lva.add(new FloatValue((byte) count)));
+ assertTrue(lva.addAll(lva));
+
+ // test copy
+ assertEquals(lva, lva.copy());
+
+ // test copyTo
+ FloatValueArray lvaTo = new FloatValueArray();
+ lva.copyTo(lvaTo);
+ assertEquals(lva, lvaTo);
+
+ // test mark/reset
+ int size = lva.size();
+ lva.mark();
+ assertTrue(lva.add(new FloatValue()));
+ assertEquals(size + 1, lva.size());
+ lva.reset();
+ assertEquals(size, lva.size());
+
+ // test clear
+ lva.clear();
+ assertEquals(0, lva.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
index e76b840..8774249 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
@@ -46,8 +46,8 @@ public class LongValueArrayComparatorTest extends ComparatorTestBase<LongValueAr
lva1.add(new LongValue(5));
LongValueArray lva2 = new LongValueArray();
- lva2.add(new LongValue(50));
- lva2.add(new LongValue(100));
+ lva2.add(new LongValue(5));
+ lva2.add(new LongValue(10));
return new LongValueArray[]{ lva0, lva1, lva2 };
}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparatorTest.java
new file mode 100644
index 0000000..1ceab4b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayComparatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ShortValue;
+
+/**
+ * Tests for {@link ShortValueArrayComparator}.
+ */
+public class ShortValueArrayComparatorTest extends ComparatorTestBase<ShortValueArray> {
+
+ @Override
+ protected TypeComparator<ShortValueArray> createComparator(boolean ascending) {
+ return new ShortValueArrayComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<ShortValueArray> createSerializer() {
+ return new ShortValueArraySerializer();
+ }
+
+ @Override
+ protected ShortValueArray[] getSortedTestData() {
+ ShortValueArray lva0 = new ShortValueArray();
+
+ ShortValueArray lva1 = new ShortValueArray();
+ lva1.add(new ShortValue((short) 5));
+
+ ShortValueArray lva2 = new ShortValueArray();
+ lva2.add(new ShortValue((short) 5));
+ lva2.add(new ShortValue((short) 10));
+
+ return new ShortValueArray[]{ lva0, lva1 };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializerTest.java
new file mode 100644
index 0000000..005f4d1
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ShortValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link ShortValueArraySerializer}.
+ */
+public class ShortValueArraySerializerTest extends SerializerTestBase<ShortValueArray> {
+
+ @Override
+ protected TypeSerializer<ShortValueArray> createSerializer() {
+ return new ShortValueArraySerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<ShortValueArray> getTypeClass() {
+ return ShortValueArray.class;
+ }
+
+ @Override
+ protected ShortValueArray[] getTestData() {
+ int defaultElements = ShortValueArray.DEFAULT_CAPACITY_IN_BYTES / ShortValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+ Random rnd = new Random(874597969123412341L);
+ int rndLong = rnd.nextInt();
+
+ ShortValueArray lva0 = new ShortValueArray();
+
+ ShortValueArray lva1 = new ShortValueArray();
+ lva1.addAll(lva0);
+ lva1.add(new ShortValue((short) 0));
+
+ ShortValueArray lva2 = new ShortValueArray();
+ lva2.addAll(lva1);
+ lva2.add(new ShortValue((short) 1));
+
+ ShortValueArray lva3 = new ShortValueArray();
+ lva3.addAll(lva2);
+ lva3.add(new ShortValue((short) -1));
+
+ ShortValueArray lva4 = new ShortValueArray();
+ lva4.addAll(lva3);
+ lva4.add(new ShortValue(Short.MAX_VALUE));
+
+ ShortValueArray lva5 = new ShortValueArray();
+ lva5.addAll(lva4);
+ lva5.add(new ShortValue(Short.MIN_VALUE));
+
+ ShortValueArray lva6 = new ShortValueArray();
+ lva6.addAll(lva5);
+ lva6.add(new ShortValue((short) rndLong));
+
+ ShortValueArray lva7 = new ShortValueArray();
+ lva7.addAll(lva6);
+ lva7.add(new ShortValue((short) -rndLong));
+
+ ShortValueArray lva8 = new ShortValueArray();
+ lva8.addAll(lva7);
+ for (int i = 0; i < 1.5 * defaultElements; i++) {
+ lva8.add(new ShortValue((short) i));
+ }
+ lva8.addAll(lva8);
+
+ return new ShortValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/546e9a77/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayTest.java
new file mode 100644
index 0000000..eed1a28
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ShortValueArrayTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.ShortValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ShortValueArray}.
+ */
+public class ShortValueArrayTest {
+
+ @Test
+ public void testBoundedArray() {
+ int count = ShortValueArray.DEFAULT_CAPACITY_IN_BYTES / ShortValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+ ValueArray<ShortValue> lva = new ShortValueArray(ShortValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+ // fill the array
+ for (int i = 0; i < count; i++) {
+ assertFalse(lva.isFull());
+ assertEquals(i, lva.size());
+
+ assertTrue(lva.add(new ShortValue((short) i)));
+
+ assertEquals(i + 1, lva.size());
+ }
+
+ // array is now full
+ assertTrue(lva.isFull());
+ assertEquals(count, lva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (ShortValue lv : lva) {
+ assertEquals((short) idx++, lv.getValue());
+ }
+
+ // add element past end of array
+ assertFalse(lva.add(new ShortValue((short) count)));
+ assertFalse(lva.addAll(lva));
+
+ // test copy
+ assertEquals(lva, lva.copy());
+
+ // test copyTo
+ ShortValueArray lvaTo = new ShortValueArray();
+ lva.copyTo(lvaTo);
+ assertEquals(lva, lvaTo);
+
+ // test clear
+ lva.clear();
+ assertEquals(0, lva.size());
+ }
+
+ @Test
+ public void testUnboundedArray() {
+ int count = 4096;
+
+ ValueArray<ShortValue> lva = new ShortValueArray();
+
+ // add several elements
+ for (int i = 0; i < count; i++) {
+ assertFalse(lva.isFull());
+ assertEquals(i, lva.size());
+
+ assertTrue(lva.add(new ShortValue((short) i)));
+
+ assertEquals(i + 1, lva.size());
+ }
+
+ // array never fills
+ assertFalse(lva.isFull());
+ assertEquals(count, lva.size());
+
+ // verify the array values
+ int idx = 0;
+ for (ShortValue lv : lva) {
+ assertEquals((short) idx++, lv.getValue());
+ }
+
+ // add element past end of array
+ assertTrue(lva.add(new ShortValue((short) count)));
+ assertTrue(lva.addAll(lva));
+
+ // test copy
+ assertEquals(lva, lva.copy());
+
+ // test copyTo
+ ShortValueArray lvaTo = new ShortValueArray();
+ lva.copyTo(lvaTo);
+ assertEquals(lva, lvaTo);
+
+ // test mark/reset
+ int size = lva.size();
+ lva.mark();
+ assertTrue(lva.add(new ShortValue()));
+ assertEquals(size + 1, lva.size());
+ lva.reset();
+ assertEquals(size, lva.size());
+
+ // test clear
+ lva.clear();
+ assertEquals(0, lva.size());
+ }
+}