You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/02 19:25:42 UTC
[1/2] git commit: [FLINK-610] Added KryoSerializer
Repository: incubator-flink
Updated Branches:
refs/heads/master d60a3169f -> dac281f40
[FLINK-610] Added KryoSerializer
This closes #74
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/22203e75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/22203e75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/22203e75
Branch: refs/heads/master
Commit: 22203e75f8a0d193ce0187396d0b267e05e9b58e
Parents: d60a316
Author: Till Rohrmann <ti...@gmail.com>
Authored: Thu Jul 17 17:21:59 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 2 16:35:00 2014 +0200
----------------------------------------------------------------------
.../api/java/typeutils/GenericTypeInfo.java | 4 +-
.../typeutils/runtime/DataInputViewStream.java | 68 ++++
.../typeutils/runtime/DataOutputViewStream.java | 41 ++
.../java/typeutils/runtime/KryoSerializer.java | 120 ++++++
.../java/typeutils/runtime/NoFetchingInput.java | 140 +++++++
.../AbstractGenericArraySerializerTest.java | 186 +++++++++
.../AbstractGenericTypeComparatorTest.java | 376 ++++++++++++++++++
.../AbstractGenericTypeSerializerTest.java | 300 +++++++++++++++
.../runtime/AvroGenericArraySerializerTest.java | 28 ++
.../runtime/AvroGenericTypeComparatorTest.java | 28 ++
.../runtime/AvroGenericTypeSerializerTest.java | 29 ++
.../runtime/GenericArraySerializerTest.java | 186 ---------
.../runtime/GenericTypeComparatorTest.java | 379 -------------------
.../runtime/GenericTypeSerializerTest.java | 300 ---------------
.../runtime/KryoGenericArraySerializerTest.java | 28 ++
.../runtime/KryoGenericTypeComparatorTest.java | 28 ++
.../runtime/KryoGenericTypeSerializerTest.java | 28 ++
.../typeutils/runtime/TupleSerializerTest.java | 11 +-
18 files changed, 1407 insertions(+), 873 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index cbc36ed..f7f78c1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.java.typeutils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
import org.apache.flink.types.TypeInformation;
@@ -64,7 +64,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
@Override
public TypeSerializer<T> createSerializer() {
- return new AvroSerializer<T>(this.typeClass);
+ return new KryoSerializer<T>(this.typeClass);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
new file mode 100644
index 0000000..81b5bae
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class DataInputViewStream extends InputStream{
+ protected DataInputView inputView;
+
+ public DataInputViewStream(DataInputView inputView){
+ this.inputView = inputView;
+ }
+
+ public DataInputView getInputView(){
+ return inputView;
+ }
+
+ @Override
+ public int read() throws IOException {
+ try{
+ return inputView.readUnsignedByte();
+ }catch(EOFException ex){
+ return -1;
+ }
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long counter = n;
+ while(counter > Integer.MAX_VALUE){
+ int skippedBytes = inputView.skipBytes(Integer.MAX_VALUE);
+
+ if(skippedBytes == 0){
+ return n - counter;
+ }
+
+ counter -= skippedBytes;
+ }
+
+ return n - counter - inputView.skipBytes((int) counter);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return inputView.read(b, off, len);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
new file mode 100644
index 0000000..6a4909e
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataOutputView;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class DataOutputViewStream extends OutputStream {
+ protected DataOutputView outputView;
+
+ public DataOutputViewStream(DataOutputView outputView){
+ this.outputView = outputView;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ outputView.writeByte(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ outputView.write(b, off, len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
new file mode 100644
index 0000000..ad7f83a
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+public class KryoSerializer<T> extends TypeSerializer<T> {
+ private static final long serialVersionUID = 1L;
+
+ private final Class<T> type;
+ private final Class<? extends T> typeToInstantiate;
+
+ private transient Kryo kryo;
+ private transient T copyInstance = null;
+ private transient Input in = null;
+
+ public KryoSerializer(Class<T> type){
+ this(type,type);
+ }
+
+ public KryoSerializer(Class<T> type, Class<? extends T> typeToInstantiate){
+ if(type == null || typeToInstantiate == null){
+ throw new NullPointerException("Type class cannot be null.");
+ }
+
+ this.type = type;
+ this.typeToInstantiate = typeToInstantiate;
+ kryo = new Kryo();
+ kryo.setAsmEnabled(true);
+ kryo.register(type);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public boolean isStateful() {
+ return true;
+ }
+
+ @Override
+ public T createInstance() {
+ checkKryoInitialized();
+ return kryo.newInstance(typeToInstantiate);
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ checkKryoInitialized();
+ reuse = kryo.copy(from);
+ return reuse;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ checkKryoInitialized();
+ DataOutputViewStream outputStream = new DataOutputViewStream(target);
+ Output out = new Output(outputStream);
+ kryo.writeObject(out, record);
+ out.flush();
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ checkKryoInitialized();
+ DataInputViewStream inputStream = new DataInputViewStream(source);
+ Input in = new NoFetchingInput(inputStream);
+ reuse = kryo.readObject(in, typeToInstantiate);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ checkKryoInitialized();
+ if(this.copyInstance == null){
+ this.copyInstance = createInstance();
+ }
+
+ T tmp = deserialize(copyInstance, source);
+ serialize(tmp, target);
+ }
+
+ private final void checkKryoInitialized() {
+ if (this.kryo == null) {
+ this.kryo = new Kryo();
+ this.kryo.setAsmEnabled(true);
+ this.kryo.register(type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
new file mode 100644
index 0000000..5462000
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class NoFetchingInput extends Input {
+ public NoFetchingInput(InputStream inputStream){
+ super(inputStream, 8);
+ }
+
+ @Override
+ public boolean eof(){
+ throw new UnsupportedOperationException("NoFetchingInput does not support EOF.");
+ }
+
+ @Override
+ public int read() throws KryoException {
+ require(1);
+ return buffer[position++] & 0xFF;
+ }
+
+ @Override
+ public boolean canReadInt() throws KryoException {
+ throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data.");
+ }
+
+ @Override
+ public boolean canReadLong() throws KryoException {
+ throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data.");
+ }
+
+ /**
+ * Require makes sure that at least required number of bytes are kept in the buffer. If not, then
+ * it will load exactly the difference between required and currently available number of bytes.
+ * Thus, it will only load the data which is required and never prefetch data.
+ *
+ * @param required the number of bytes being available in the buffer
+ * @return the number of bytes remaining, which is equal to required
+ * @throws KryoException
+ */
+ @Override
+ protected int require(int required) throws KryoException {
+ if(required > capacity) {
+ throw new KryoException("Buffer too small: capacity: " + capacity + ", " +
+ "required: " + required);
+ }
+
+ position = 0;
+ int bytesRead = 0;
+ int count;
+ while(true){
+ count = fill(buffer, bytesRead, required - bytesRead);
+
+ if(count == -1){
+ throw new KryoException("Buffer underflow");
+ }
+
+ bytesRead += count;
+ if(bytesRead == required){
+ break;
+ }
+ }
+ limit = required;
+ return required;
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int count) throws KryoException {
+ if(bytes == null){
+ throw new IllegalArgumentException("bytes cannot be null.");
+ }
+
+ try {
+ return inputStream.read(bytes, offset, count);
+ }catch(IOException ex){
+ throw new KryoException(ex);
+ }
+ }
+
+ @Override
+ public void skip(int count) throws KryoException {
+ try{
+ inputStream.skip(count);
+ }catch(IOException ex){
+ throw new KryoException(ex);
+ }
+ }
+
+ @Override
+ public void readBytes(byte[] bytes, int offset, int count) throws KryoException {
+ if(bytes == null){
+ throw new IllegalArgumentException("bytes cannot be null.");
+ }
+
+ try{
+ int bytesRead = 0;
+ int c;
+
+ while(true){
+ c = inputStream.read(bytes, offset+bytesRead, count-bytesRead);
+
+ if(c == -1){
+ throw new KryoException("Buffer underflow");
+ }
+
+ bytesRead += c;
+
+ if(bytesRead == count){
+ break;
+ }
+ }
+ }catch(IOException ex){
+ throw new KryoException(ex);
+ }
+ }
+
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericArraySerializerTest.java
new file mode 100644
index 0000000..0700923
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericArraySerializerTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.Book;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.BookAuthor;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject1;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject2;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.SimpleTypes;
+import org.apache.flink.util.StringUtils;
+
+abstract public class AbstractGenericArraySerializerTest {
+
+ private final Random rnd = new Random(349712539451944123L);
+
+
+ @Test
+ public void testString() {
+ String[] arr1 = new String[] {"abc", "",
+ StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
+ StringUtils.getRandomString(new Random(289347567856686223L), 15, 50),
+ StringUtils.getRandomString(new Random(289347567856686223L), 30, 170),
+ StringUtils.getRandomString(new Random(289347567856686223L), 14, 15),
+ ""};
+
+ String[] arr2 = new String[] {"foo", "",
+ StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
+ StringUtils.getRandomString(new Random(289347567856686223L), 1000, 5000),
+ StringUtils.getRandomString(new Random(289347567856686223L), 30000, 35000),
+ StringUtils.getRandomString(new Random(289347567856686223L), 100*1024, 105*1024),
+ "bar"};
+
+ // run tests with the string serializer as the component serializer
+ runTests(String.class, new StringSerializer(), arr1, arr2);
+
+ // run the tests with the generic serializer as the component serializer
+ runTests(arr1, arr2);
+ }
+
+ @Test
+ public void testSimpleTypesObjects() {
+ SimpleTypes a = new SimpleTypes();
+ SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+ SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+ SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+ SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+ SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+ SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+
+ runTests(new SimpleTypes[] {a, b, c}, new SimpleTypes[] {d, e, f, g});
+ }
+
+ @Test
+ public void testCompositeObject() {
+ ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
+ ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
+ ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
+ ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
+ ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
+
+ runTests(new ComplexNestedObject1[] {o1, o2}, new ComplexNestedObject1[] {o3}, new ComplexNestedObject1[] {o4, o5});
+ }
+
+ @Test
+ public void testNestedObjects() {
+ ComplexNestedObject2 o1 = new ComplexNestedObject2(rnd);
+ ComplexNestedObject2 o2 = new ComplexNestedObject2();
+ ComplexNestedObject2 o3 = new ComplexNestedObject2(rnd);
+ ComplexNestedObject2 o4 = new ComplexNestedObject2(rnd);
+
+ runTests( new ComplexNestedObject2[] {o1, o2, o3},
+ new ComplexNestedObject2[] {},
+ new ComplexNestedObject2[] {},
+ new ComplexNestedObject2[] {o4},
+ new ComplexNestedObject2[] {});
+ }
+
+ @Test
+ public void testBeanStyleObjects() {
+ {
+ Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
+ Book b2 = new Book(0L, "Debugging byte streams", 1337);
+ Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
+ Book b4 = new Book(Long.MAX_VALUE, "The joy of bits and bytes", 0xDEADBEEF);
+ Book b5 = new Book(Long.MIN_VALUE, "Winnign a prize for creative test strings", 0xBADF00);
+ Book b6 = new Book(-2L, "Distributed Systems", 0xABCDEF0123456789L);
+
+ runTests( new Book[] {b1, b2},
+ new Book[] {},
+ new Book[] {},
+ new Book[] {},
+ new Book[] {},
+ new Book[] {b3, b4, b5, b6});
+ }
+
+ // object with collection
+ {
+ ArrayList<String> list = new ArrayList<String>();
+ list.add("A");
+ list.add("B");
+ list.add("C");
+ list.add("D");
+ list.add("E");
+
+ BookAuthor b1 = new BookAuthor(976243875L, list, "Arno Nym");
+
+ ArrayList<String> list2 = new ArrayList<String>();
+ BookAuthor b2 = new BookAuthor(987654321L, list2, "The Saurus");
+
+ runTests(new BookAuthor[] {b1, b2});
+ }
+ }
+
+ private final <T> void runTests(T[]... instances) {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<T> type = (Class<T>) instances[0][0].getClass();
+ TypeSerializer<T> serializer = createComponentSerializer(type);
+ runTests(type, serializer, instances);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private final <T> void runTests(Class<T> type, TypeSerializer<T> componentSerializer, T[]... instances) {
+ try {
+ if (type == null || componentSerializer == null || instances == null || instances.length == 0) {
+ throw new IllegalArgumentException();
+ }
+
+ @SuppressWarnings("unchecked")
+ Class<T[]> arrayClass = (Class<T[]>) Array.newInstance(type, 0).getClass();
+
+ GenericArraySerializer<T> serializer = createSerializer(type, componentSerializer);
+ SerializerTestInstance<T[]> test = new SerializerTestInstance<T[]>(serializer, arrayClass, -1, instances);
+ test.testAll();
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private final <T> GenericArraySerializer<T> createSerializer(Class<T> type, TypeSerializer<T> componentSerializer) {
+ return new GenericArraySerializer<T>(type, componentSerializer);
+ }
+
+ abstract protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeComparatorTest.java
new file mode 100644
index 0000000..dee7d9a
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeComparatorTest.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+abstract public class AbstractGenericTypeComparatorTest {
+
+ @Test
+ public void testString() {
+ runTests(new String[]{
+ "",
+ "Lorem Ipsum Dolor Omit Longer",
+ "aaaa",
+ "abcd",
+ "abce",
+ "abdd",
+ "accd",
+ "bbcd"
+ });
+ }
+
+ @Test
+ public void testSimpleTypesObjects() {
+ runTests(
+ new SimpleTypes(0, 1, (byte) 2, "", (short) 3, 4.0),
+ new SimpleTypes(1, 1, (byte) 2, "", (short) 3, 4.0),
+ new SimpleTypes(1, 2, (byte) 2, "", (short) 3, 4.0),
+ new SimpleTypes(1, 2, (byte) 3, "", (short) 3, 4.0),
+ new SimpleTypes(1, 2, (byte) 3, "a", (short) 3, 4.0),
+ new SimpleTypes(1, 2, (byte) 3, "b", (short) 3, 4.0),
+ new SimpleTypes(1, 2, (byte) 3, "b", (short) 4, 4.0),
+ new SimpleTypes(1, 2, (byte) 3, "b", (short) 4, 6.0)
+ );
+ }
+
+ @Test
+ public void testCompositeObject() {
+ ComplexNestedObject1 o1 = new ComplexNestedObject1(-1100);
+ ComplexNestedObject1 o2 = new ComplexNestedObject1(0);
+ ComplexNestedObject1 o3 = new ComplexNestedObject1(44);
+ ComplexNestedObject1 o4 = new ComplexNestedObject1(76923, "A");
+ ComplexNestedObject1 o5 = new ComplexNestedObject1(5626435, "A somewhat random collection");
+
+ runTests(o1, o2, o3, o4, o5);
+ }
+
+ @Test
+ public void testBeanStyleObjects() {
+ {
+ Book b111 = new Book(-1L, "A Low level interfaces", 0xC);
+ Book b122 = new Book(-1L, "Low level interfaces", 0xC);
+ Book b123 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
+
+ Book b2 = new Book(0L, "Debugging byte streams", 1337);
+ Book b3 = new Book(976243875L, "The Serialization Odysse", 42);
+
+ runTests(b111, b122, b123, b2, b3);
+ }
+
+ {
+ BookAuthor b1 = new BookAuthor(976243875L, new ArrayList<String>(), "Arno Nym");
+
+ ArrayList<String> list = new ArrayList<String>();
+ list.add("A");
+ list.add("B");
+ list.add("C");
+ list.add("D");
+ list.add("E");
+
+ BookAuthor b2 = new BookAuthor(976243875L, list, "The Saurus");
+
+ runTests(b1, b2);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private final <T> void runTests(T... sortedTestData) {
+ ComparatorTestInstance<T> testBase = new ComparatorTestInstance<T>(sortedTestData);
+ testBase.testAll();
+ }
+
+ abstract protected <T> TypeSerializer<T> createSerializer(Class<T> type);
+
+ // ------------------------------------------------------------------------
+ // test instance
+ // ------------------------------------------------------------------------
+
+ private class ComparatorTestInstance<T> extends ComparatorTestBase<T> {
+
+ private final T[] testData;
+
+ private final Class<T> type;
+
+ @SuppressWarnings("unchecked")
+ public ComparatorTestInstance(T[] testData) {
+ if (testData == null || testData.length == 0) {
+ throw new IllegalArgumentException();
+ }
+
+ this.testData = testData;
+ this.type = (Class<T>) testData[0].getClass();
+ }
+
+ @Override
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected TypeComparator<T> createComparator(boolean ascending) {
+ return new GenericTypeComparator(ascending, AbstractGenericTypeComparatorTest.this.createSerializer(this
+ .type), this.type);
+ }
+
+ @Override
+ protected TypeSerializer<T> createSerializer() {
+ return AbstractGenericTypeComparatorTest.this.createSerializer(this.type);
+ }
+
+ @Override
+ protected T[] getSortedTestData() {
+ return this.testData;
+ }
+
+ public void testAll() {
+ testDuplicate();
+ testEquality();
+ testEqualityWithReference();
+ testInequality();
+ testInequalityWithReference();
+ testNormalizedKeysEqualsFullLength();
+ testNormalizedKeysEqualsHalfLength();
+ testNormalizedKeysGreatSmallFullLength();
+ testNormalizedKeysGreatSmallAscDescHalfLength();
+ testNormalizedKeyReadWriter();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test objects
+ // ------------------------------------------------------------------------
+
+ public static final class SimpleTypes implements Comparable<SimpleTypes> {
+
+ private final int iVal;
+ private final long lVal;
+ private final byte bVal;
+ private final String sVal;
+ private final short rVal;
+ private final double dVal;
+
+ public SimpleTypes() {
+ this(0, 0, (byte) 0, "", (short) 0, 0);
+ }
+
+ public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
+ this.iVal = iVal;
+ this.lVal = lVal;
+ this.bVal = bVal;
+ this.sVal = sVal;
+ this.rVal = rVal;
+ this.dVal = dVal;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(%d, %d, %d, %s, %d, %f)", iVal, lVal, bVal, sVal, rVal, dVal);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == SimpleTypes.class) {
+ SimpleTypes other = (SimpleTypes) obj;
+
+ return other.iVal == this.iVal &&
+ other.lVal == this.lVal &&
+ other.bVal == this.bVal &&
+ other.sVal.equals(this.sVal) &&
+ other.rVal == this.rVal &&
+ other.dVal == this.dVal;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int compareTo(SimpleTypes o) {
+ int cmp = (this.iVal < o.iVal ? -1 : (this.iVal == o.iVal ? 0 : 1));
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ cmp = (this.lVal < o.lVal ? -1 : (this.lVal == o.lVal ? 0 : 1));
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ cmp = (this.bVal < o.bVal ? -1 : (this.bVal == o.bVal ? 0 : 1));
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ cmp = this.sVal.compareTo(o.sVal);
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ cmp = (this.rVal < o.rVal ? -1 : (this.rVal == o.rVal ? 0 : 1));
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return (this.dVal < o.dVal ? -1 : (this.dVal == o.dVal ? 0 : 1));
+ }
+ }
+
+ public static class ComplexNestedObject1 implements Comparable<ComplexNestedObject1> {
+
+ private double doubleValue;
+
+ private List<String> stringList;
+
+ public ComplexNestedObject1() {
+ }
+
+ public ComplexNestedObject1(double value, String... listElements) {
+ this.doubleValue = value;
+
+ this.stringList = new ArrayList<String>();
+ for (String str : listElements) {
+ this.stringList.add(str);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == ComplexNestedObject1.class) {
+ ComplexNestedObject1 other = (ComplexNestedObject1) obj;
+ return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int compareTo(ComplexNestedObject1 o) {
+ int cmp = (this.doubleValue < o.doubleValue ? -1 : (this.doubleValue == o.doubleValue ? 0 : 1));
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ int size = this.stringList.size();
+ int otherSize = o.stringList.size();
+
+ cmp = (size < otherSize ? -1 : (size == otherSize ? 0 : 1));
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ for (int i = 0; i < size; i++) {
+ cmp = this.stringList.get(i).compareTo(o.stringList.get(i));
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return 0;
+ }
+ }
+
+ public static class Book implements Comparable<Book> {
+
+ private long bookId;
+ private String title;
+ private long authorId;
+
+ public Book() {
+ }
+
+ public Book(long bookId, String title, long authorId) {
+ this.bookId = bookId;
+ this.title = title;
+ this.authorId = authorId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == Book.class) {
+ Book other = (Book) obj;
+ return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int compareTo(Book o) {
+ int cmp = (this.bookId < o.bookId ? -1 : (this.bookId == o.bookId ? 0 : 1));
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ cmp = title.compareTo(o.title);
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return (this.authorId < o.authorId ? -1 : (this.authorId == o.authorId ? 0 : 1));
+ }
+ }
+
+ public static class BookAuthor implements Comparable<BookAuthor> {
+
+ private long authorId;
+ private List<String> bookTitles;
+ private String authorName;
+
+ public BookAuthor() {
+ }
+
+ public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+ this.authorId = authorId;
+ this.bookTitles = bookTitles;
+ this.authorName = authorName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == BookAuthor.class) {
+ BookAuthor other = (BookAuthor) obj;
+ return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
+ other.bookTitles.equals(this.bookTitles);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int compareTo(BookAuthor o) {
+ int cmp = (this.authorId < o.authorId ? -1 : (this.authorId == o.authorId ? 0 : 1));
+ if (cmp != 0) return cmp;
+
+ int size = this.bookTitles.size();
+ int oSize = o.bookTitles.size();
+ cmp = (size < oSize ? -1 : (size == oSize ? 0 : 1));
+ if (cmp != 0) return cmp;
+
+ for (int i = 0; i < size; i++) {
+ cmp = this.bookTitles.get(i).compareTo(o.bookTitles.get(i));
+ if (cmp != 0) return cmp;
+ }
+
+ return this.authorName.compareTo(o.authorName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
new file mode 100644
index 0000000..8f97c43
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
+import org.apache.flink.util.StringUtils;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * A test for the {@link AvroSerializer}.
+ */
+abstract public class AbstractGenericTypeSerializerTest {
+
+ private final Random rnd = new Random(349712539451944123L);
+
+
+ @Test
+ public void testString() {
+ runTests("abc", "",
+ StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
+ StringUtils.getRandomString(new Random(289347567856686223L), 1000, 5000),
+ StringUtils.getRandomString(new Random(289347567856686223L), 30000, 35000),
+ StringUtils.getRandomString(new Random(289347567856686223L), 100 * 1024, 105 * 1024));
+ }
+
+ @Test
+ public void testSimpleTypesObjects() {
+ SimpleTypes a = new SimpleTypes();
+ SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+ SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+ SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+ SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+ SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+ SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+ StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+
+ runTests(a, b, c, d, e, f, g);
+ }
+
+ @Test
+ public void testCompositeObject() {
+ ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
+ ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
+ ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
+ ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
+ ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
+
+ runTests(o1, o2, o3, o4, o5);
+ }
+
+ @Test
+ public void testNestedObjects() {
+ ComplexNestedObject2 o1 = new ComplexNestedObject2(rnd);
+ ComplexNestedObject2 o2 = new ComplexNestedObject2();
+ ComplexNestedObject2 o3 = new ComplexNestedObject2(rnd);
+ ComplexNestedObject2 o4 = new ComplexNestedObject2(rnd);
+
+ runTests(o1, o2, o3, o4);
+ }
+
+ @Test
+ public void testBeanStyleObjects() {
+ {
+ Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
+ Book b2 = new Book(0L, "Debugging byte streams", 1337);
+ Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
+
+ runTests(b1, b2, b3);
+ }
+
+ // object with collection
+ {
+ ArrayList<String> list = new ArrayList<String>();
+ list.add("A");
+ list.add("B");
+ list.add("C");
+ list.add("D");
+ list.add("E");
+
+ BookAuthor b1 = new BookAuthor(976243875L, list, "Arno Nym");
+
+ ArrayList<String> list2 = new ArrayList<String>();
+ BookAuthor b2 = new BookAuthor(987654321L, list2, "The Saurus");
+
+ runTests(b1, b2);
+ }
+ }
+
+ private final <T> void runTests(T... instances) {
+ if (instances == null || instances.length == 0) {
+ throw new IllegalArgumentException();
+ }
+
+ @SuppressWarnings("unchecked")
+ Class<T> clazz = (Class<T>) instances[0].getClass();
+
+ TypeSerializer<T> serializer = createSerializer(clazz);
+ SerializerTestInstance<T> test = new SerializerTestInstance<T>(serializer, clazz, -1, instances);
+ test.testAll();
+ }
+
+ abstract protected <T> TypeSerializer<T> createSerializer(Class<T> type);
+
+
+ // --------------------------------------------------------------------------------------------
+ // Test Objects
+ // --------------------------------------------------------------------------------------------
+
+
+ public static final class SimpleTypes {
+
+ private final int iVal;
+ private final long lVal;
+ private final byte bVal;
+ private final String sVal;
+ private final short rVal;
+ private final double dVal;
+
+
+ public SimpleTypes() {
+ this(0, 0, (byte) 0, "", (short) 0, 0);
+ }
+
+ public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
+ this.iVal = iVal;
+ this.lVal = lVal;
+ this.bVal = bVal;
+ this.sVal = sVal;
+ this.rVal = rVal;
+ this.dVal = dVal;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == SimpleTypes.class) {
+ SimpleTypes other = (SimpleTypes) obj;
+
+ return other.iVal == this.iVal &&
+ other.lVal == this.lVal &&
+ other.bVal == this.bVal &&
+ other.sVal.equals(this.sVal) &&
+ other.rVal == this.rVal &&
+ other.dVal == this.dVal;
+
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(%d, %d, %d, %s, %d, %f)", iVal, lVal, bVal, sVal, rVal, dVal);
+ }
+ }
+
+ public static class ComplexNestedObject1 {
+
+ private double doubleValue;
+
+ private List<String> stringList;
+
+ public ComplexNestedObject1() {
+ }
+
+ public ComplexNestedObject1(int offInit) {
+ this.doubleValue = 6293485.6723 + offInit;
+
+ this.stringList = new ArrayList<String>();
+ this.stringList.add("A" + offInit);
+ this.stringList.add("somewhat" + offInit);
+ this.stringList.add("random" + offInit);
+ this.stringList.add("collection" + offInit);
+ this.stringList.add("of" + offInit);
+ this.stringList.add("strings" + offInit);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == ComplexNestedObject1.class) {
+ ComplexNestedObject1 other = (ComplexNestedObject1) obj;
+ return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public static class ComplexNestedObject2 {
+
+ private long longValue;
+
+ private Map<String, ComplexNestedObject1> theMap = new HashMap<String, ComplexNestedObject1>();
+
+ public ComplexNestedObject2() {
+ }
+
+ public ComplexNestedObject2(Random rnd) {
+ this.longValue = rnd.nextLong();
+
+ this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+ this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+ this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+ this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+ this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+ this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == ComplexNestedObject2.class) {
+ ComplexNestedObject2 other = (ComplexNestedObject2) obj;
+ return other.longValue == this.longValue && this.theMap.equals(other.theMap);
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public static class Book {
+
+ private long bookId;
+ private String title;
+ private long authorId;
+
+ public Book() {
+ }
+
+ public Book(long bookId, String title, long authorId) {
+ this.bookId = bookId;
+ this.title = title;
+ this.authorId = authorId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == Book.class) {
+ Book other = (Book) obj;
+ return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public static class BookAuthor {
+
+ private long authorId;
+ private List<String> bookTitles;
+ private String authorName;
+
+ public BookAuthor() {
+ }
+
+ public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+ this.authorId = authorId;
+ this.bookTitles = bookTitles;
+ this.authorName = authorName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == BookAuthor.class) {
+ BookAuthor other = (BookAuthor) obj;
+ return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
+ other.bookTitles.equals(this.bookTitles);
+ } else {
+ return false;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
new file mode 100644
index 0000000..50b4e06
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class AvroGenericArraySerializerTest extends AbstractGenericArraySerializerTest {
+ @Override
+ protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) {
+ return new AvroSerializer<T>(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
new file mode 100644
index 0000000..587e7c4
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class AvroGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest {
+ @Override
+ protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+ return new AvroSerializer<T>(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
new file mode 100644
index 0000000..9f4d482
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class AvroGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
+
+ @Override
+ protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+ return new AvroSerializer<T>(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializerTest.java
deleted file mode 100644
index af31684..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializerTest.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.Random;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
-import org.apache.flink.api.java.typeutils.runtime.GenericArraySerializer;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.Book;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.BookAuthor;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.ComplexNestedObject1;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.ComplexNestedObject2;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.SimpleTypes;
-import org.apache.flink.util.StringUtils;
-
-public class GenericArraySerializerTest {
-
- private final Random rnd = new Random(349712539451944123L);
-
-
- @Test
- public void testString() {
- String[] arr1 = new String[] {"abc", "",
- StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
- StringUtils.getRandomString(new Random(289347567856686223L), 15, 50),
- StringUtils.getRandomString(new Random(289347567856686223L), 30, 170),
- StringUtils.getRandomString(new Random(289347567856686223L), 14, 15),
- ""};
-
- String[] arr2 = new String[] {"foo", "",
- StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
- StringUtils.getRandomString(new Random(289347567856686223L), 1000, 5000),
- StringUtils.getRandomString(new Random(289347567856686223L), 30000, 35000),
- StringUtils.getRandomString(new Random(289347567856686223L), 100*1024, 105*1024),
- "bar"};
-
- // run tests with the string serializer as the component serializer
- runTests(String.class, new StringSerializer(), arr1, arr2);
-
- // run the tests with the generic serializer as the component serializer
- runTests(arr1, arr2);
- }
-
- @Test
- public void testSimpleTypesObjects() {
- SimpleTypes a = new SimpleTypes();
- SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
- SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
- SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
- SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
- SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
- SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-
- runTests(new SimpleTypes[] {a, b, c}, new SimpleTypes[] {d, e, f, g});
- }
-
- @Test
- public void testCompositeObject() {
- ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
- ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
- ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
- ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
- ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
-
- runTests(new ComplexNestedObject1[] {o1, o2}, new ComplexNestedObject1[] {o3}, new ComplexNestedObject1[] {o4, o5});
- }
-
- @Test
- public void testNestedObjects() {
- ComplexNestedObject2 o1 = new ComplexNestedObject2(rnd);
- ComplexNestedObject2 o2 = new ComplexNestedObject2();
- ComplexNestedObject2 o3 = new ComplexNestedObject2(rnd);
- ComplexNestedObject2 o4 = new ComplexNestedObject2(rnd);
-
- runTests( new ComplexNestedObject2[] {o1, o2, o3},
- new ComplexNestedObject2[] {},
- new ComplexNestedObject2[] {},
- new ComplexNestedObject2[] {o4},
- new ComplexNestedObject2[] {});
- }
-
- @Test
- public void testBeanStyleObjects() {
- {
- Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
- Book b2 = new Book(0L, "Debugging byte streams", 1337);
- Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
- Book b4 = new Book(Long.MAX_VALUE, "The joy of bits and bytes", 0xDEADBEEF);
- Book b5 = new Book(Long.MIN_VALUE, "Winnign a prize for creative test strings", 0xBADF00);
- Book b6 = new Book(-2L, "Distributed Systems", 0xABCDEF0123456789L);
-
- runTests( new Book[] {b1, b2},
- new Book[] {},
- new Book[] {},
- new Book[] {},
- new Book[] {},
- new Book[] {b3, b4, b5, b6});
- }
-
- // object with collection
- {
- ArrayList<String> list = new ArrayList<String>();
- list.add("A");
- list.add("B");
- list.add("C");
- list.add("D");
- list.add("E");
-
- BookAuthor b1 = new BookAuthor(976243875L, list, "Arno Nym");
-
- ArrayList<String> list2 = new ArrayList<String>();
- BookAuthor b2 = new BookAuthor(987654321L, list2, "The Saurus");
-
- runTests(new BookAuthor[] {b1, b2});
- }
- }
-
- private final <T> void runTests(T[]... instances) {
- try {
- @SuppressWarnings("unchecked")
- Class<T> type = (Class<T>) instances[0][0].getClass();
- TypeSerializer<T> serializer = new AvroSerializer<T>(type);
- runTests(type, serializer, instances);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- private final <T> void runTests(Class<T> type, TypeSerializer<T> componentSerializer, T[]... instances) {
- try {
- if (type == null || componentSerializer == null || instances == null || instances.length == 0) {
- throw new IllegalArgumentException();
- }
-
- @SuppressWarnings("unchecked")
- Class<T[]> arrayClass = (Class<T[]>) Array.newInstance(type, 0).getClass();
-
- GenericArraySerializer<T> serializer = createSerializer(type, componentSerializer);
- SerializerTestInstance<T[]> test = new SerializerTestInstance<T[]>(serializer, arrayClass, -1, instances);
- test.testAll();
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- private final <T> GenericArraySerializer<T> createSerializer(Class<T> type, TypeSerializer<T> componentSerializer) {
- return new GenericArraySerializer<T>(type, componentSerializer);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparatorTest.java
deleted file mode 100644
index 154a06f..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparatorTest.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class GenericTypeComparatorTest {
-
- @Test
- public void testString() {
- runTests(new String[]{
- "",
- "Lorem Ipsum Dolor Omit Longer",
- "aaaa",
- "abcd",
- "abce",
- "abdd",
- "accd",
- "bbcd"
- });
- }
-
- @Test
- public void testSimpleTypesObjects() {
- runTests(
- new SimpleTypes(0, 1, (byte) 2, "", (short) 3, 4.0),
- new SimpleTypes(1, 1, (byte) 2, "", (short) 3, 4.0),
- new SimpleTypes(1, 2, (byte) 2, "", (short) 3, 4.0),
- new SimpleTypes(1, 2, (byte) 3, "", (short) 3, 4.0),
- new SimpleTypes(1, 2, (byte) 3, "a", (short) 3, 4.0),
- new SimpleTypes(1, 2, (byte) 3, "b", (short) 3, 4.0),
- new SimpleTypes(1, 2, (byte) 3, "b", (short) 4, 4.0),
- new SimpleTypes(1, 2, (byte) 3, "b", (short) 4, 6.0)
- );
- }
-
- @Test
- public void testCompositeObject() {
- ComplexNestedObject1 o1 = new ComplexNestedObject1(-1100);
- ComplexNestedObject1 o2 = new ComplexNestedObject1(0);
- ComplexNestedObject1 o3 = new ComplexNestedObject1(44);
- ComplexNestedObject1 o4 = new ComplexNestedObject1(76923, "A");
- ComplexNestedObject1 o5 = new ComplexNestedObject1(5626435, "A somewhat random collection");
-
- runTests(o1, o2, o3, o4, o5);
- }
-
- @Test
- public void testBeanStyleObjects() {
- {
- Book b111 = new Book(-1L, "A Low level interfaces", 0xC);
- Book b122 = new Book(-1L, "Low level interfaces", 0xC);
- Book b123 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
-
- Book b2 = new Book(0L, "Debugging byte streams", 1337);
- Book b3 = new Book(976243875L, "The Serialization Odysse", 42);
-
- runTests(b111, b122, b123, b2, b3);
- }
-
- {
- BookAuthor b1 = new BookAuthor(976243875L, new ArrayList<String>(), "Arno Nym");
-
- ArrayList<String> list = new ArrayList<String>();
- list.add("A");
- list.add("B");
- list.add("C");
- list.add("D");
- list.add("E");
-
- BookAuthor b2 = new BookAuthor(976243875L, list, "The Saurus");
-
- runTests(b1, b2);
- }
- }
-
- // ------------------------------------------------------------------------
-
- private final <T> void runTests(T... sortedTestData) {
- ComparatorTestInstance<T> testBase = new ComparatorTestInstance<T>(sortedTestData);
- testBase.testAll();
- }
-
- private static final <T> TypeSerializer<T> createSerializer(Class<T> type) {
- return new AvroSerializer<T>(type);
- }
-
- // ------------------------------------------------------------------------
- // test instance
- // ------------------------------------------------------------------------
-
- private class ComparatorTestInstance<T> extends ComparatorTestBase<T> {
-
- private final T[] testData;
-
- private final Class<T> type;
-
- @SuppressWarnings("unchecked")
- public ComparatorTestInstance(T[] testData) {
- if (testData == null || testData.length == 0) {
- throw new IllegalArgumentException();
- }
-
- this.testData = testData;
- this.type = (Class<T>) testData[0].getClass();
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- protected TypeComparator<T> createComparator(boolean ascending) {
- return new GenericTypeComparator(ascending, GenericTypeComparatorTest.createSerializer(this.type), this.type);
- }
-
- @Override
- protected TypeSerializer<T> createSerializer() {
- return GenericTypeComparatorTest.createSerializer(this.type);
- }
-
- @Override
- protected T[] getSortedTestData() {
- return this.testData;
- }
-
- public void testAll() {
- testDuplicate();
- testEquality();
- testEqualityWithReference();
- testInequality();
- testInequalityWithReference();
- testNormalizedKeysEqualsFullLength();
- testNormalizedKeysEqualsHalfLength();
- testNormalizedKeysGreatSmallFullLength();
- testNormalizedKeysGreatSmallAscDescHalfLength();
- testNormalizedKeyReadWriter();
- }
- }
-
- // ------------------------------------------------------------------------
- // test objects
- // ------------------------------------------------------------------------
-
- public static final class SimpleTypes implements Comparable<SimpleTypes> {
-
- private final int iVal;
- private final long lVal;
- private final byte bVal;
- private final String sVal;
- private final short rVal;
- private final double dVal;
-
- public SimpleTypes() {
- this(0, 0, (byte) 0, "", (short) 0, 0);
- }
-
- public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
- this.iVal = iVal;
- this.lVal = lVal;
- this.bVal = bVal;
- this.sVal = sVal;
- this.rVal = rVal;
- this.dVal = dVal;
- }
-
- @Override
- public String toString() {
- return String.format("(%d, %d, %d, %s, %d, %f)", iVal, lVal, bVal, sVal, rVal, dVal);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == SimpleTypes.class) {
- SimpleTypes other = (SimpleTypes) obj;
-
- return other.iVal == this.iVal &&
- other.lVal == this.lVal &&
- other.bVal == this.bVal &&
- other.sVal.equals(this.sVal) &&
- other.rVal == this.rVal &&
- other.dVal == this.dVal;
- } else {
- return false;
- }
- }
-
- @Override
- public int compareTo(SimpleTypes o) {
- int cmp = (this.iVal < o.iVal ? -1 : (this.iVal == o.iVal ? 0 : 1));
- if (cmp != 0) {
- return cmp;
- }
-
- cmp = (this.lVal < o.lVal ? -1 : (this.lVal == o.lVal ? 0 : 1));
- if (cmp != 0) {
- return cmp;
- }
-
- cmp = (this.bVal < o.bVal ? -1 : (this.bVal == o.bVal ? 0 : 1));
- if (cmp != 0) {
- return cmp;
- }
-
- cmp = this.sVal.compareTo(o.sVal);
- if (cmp != 0) {
- return cmp;
- }
-
- cmp = (this.rVal < o.rVal ? -1 : (this.rVal == o.rVal ? 0 : 1));
- if (cmp != 0) {
- return cmp;
- }
-
- return (this.dVal < o.dVal ? -1 : (this.dVal == o.dVal ? 0 : 1));
- }
- }
-
- public static class ComplexNestedObject1 implements Comparable<ComplexNestedObject1> {
-
- private double doubleValue;
-
- private List<String> stringList;
-
- public ComplexNestedObject1() {
- }
-
- public ComplexNestedObject1(double value, String... listElements) {
- this.doubleValue = value;
-
- this.stringList = new ArrayList<String>();
- for (String str : listElements) {
- this.stringList.add(str);
- }
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == ComplexNestedObject1.class) {
- ComplexNestedObject1 other = (ComplexNestedObject1) obj;
- return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
- } else {
- return false;
- }
- }
-
- @Override
- public int compareTo(ComplexNestedObject1 o) {
- int cmp = (this.doubleValue < o.doubleValue ? -1 : (this.doubleValue == o.doubleValue ? 0 : 1));
- if (cmp != 0) {
- return cmp;
- }
-
- int size = this.stringList.size();
- int otherSize = o.stringList.size();
-
- cmp = (size < otherSize ? -1 : (size == otherSize ? 0 : 1));
- if (cmp != 0) {
- return cmp;
- }
-
- for (int i = 0; i < size; i++) {
- cmp = this.stringList.get(i).compareTo(o.stringList.get(i));
- if (cmp != 0) {
- return cmp;
- }
- }
-
- return 0;
- }
- }
-
- public static class Book implements Comparable<Book> {
-
- private long bookId;
- private String title;
- private long authorId;
-
- public Book() {
- }
-
- public Book(long bookId, String title, long authorId) {
- this.bookId = bookId;
- this.title = title;
- this.authorId = authorId;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == Book.class) {
- Book other = (Book) obj;
- return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
- } else {
- return false;
- }
- }
-
- @Override
- public int compareTo(Book o) {
- int cmp = (this.bookId < o.bookId ? -1 : (this.bookId == o.bookId ? 0 : 1));
- if (cmp != 0) {
- return cmp;
- }
-
- cmp = title.compareTo(o.title);
- if (cmp != 0) {
- return cmp;
- }
-
- return (this.authorId < o.authorId ? -1 : (this.authorId == o.authorId ? 0 : 1));
- }
- }
-
- public static class BookAuthor implements Comparable<BookAuthor> {
-
- private long authorId;
- private List<String> bookTitles;
- private String authorName;
-
- public BookAuthor() {
- }
-
- public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
- this.authorId = authorId;
- this.bookTitles = bookTitles;
- this.authorName = authorName;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == BookAuthor.class) {
- BookAuthor other = (BookAuthor) obj;
- return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
- other.bookTitles.equals(this.bookTitles);
- } else {
- return false;
- }
- }
-
- @Override
- public int compareTo(BookAuthor o) {
- int cmp = (this.authorId < o.authorId ? -1 : (this.authorId == o.authorId ? 0 : 1));
- if (cmp != 0) return cmp;
-
- int size = this.bookTitles.size();
- int oSize = o.bookTitles.size();
- cmp = (size < oSize ? -1 : (size == oSize ? 0 : 1));
- if (cmp != 0) return cmp;
-
- for (int i = 0; i < size; i++) {
- cmp = this.bookTitles.get(i).compareTo(o.bookTitles.get(i));
- if (cmp != 0) return cmp;
- }
-
- return this.authorName.compareTo(o.authorName);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeSerializerTest.java
deleted file mode 100644
index 84622f0..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeSerializerTest.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
-import org.apache.flink.util.StringUtils;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * A test for the {@link AvroSerializer}.
- */
-public class GenericTypeSerializerTest {
-
- private final Random rnd = new Random(349712539451944123L);
-
-
- @Test
- public void testString() {
- runTests("abc", "",
- StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
- StringUtils.getRandomString(new Random(289347567856686223L), 1000, 5000),
- StringUtils.getRandomString(new Random(289347567856686223L), 30000, 35000),
- StringUtils.getRandomString(new Random(289347567856686223L), 100 * 1024, 105 * 1024));
- }
-
- @Test
- public void testSimpleTypesObjects() {
- SimpleTypes a = new SimpleTypes();
- SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
- SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
- SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
- SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
- SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
- SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
- StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-
- runTests(a, b, c, d, e, f, g);
- }
-
- @Test
- public void testCompositeObject() {
- ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
- ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
- ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
- ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
- ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
-
- runTests(o1, o2, o3, o4, o5);
- }
-
- @Test
- public void testNestedObjects() {
- ComplexNestedObject2 o1 = new ComplexNestedObject2(rnd);
- ComplexNestedObject2 o2 = new ComplexNestedObject2();
- ComplexNestedObject2 o3 = new ComplexNestedObject2(rnd);
- ComplexNestedObject2 o4 = new ComplexNestedObject2(rnd);
-
- runTests(o1, o2, o3, o4);
- }
-
- @Test
- public void testBeanStyleObjects() {
- {
- Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
- Book b2 = new Book(0L, "Debugging byte streams", 1337);
- Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
-
- runTests(b1, b2, b3);
- }
-
- // object with collection
- {
- ArrayList<String> list = new ArrayList<String>();
- list.add("A");
- list.add("B");
- list.add("C");
- list.add("D");
- list.add("E");
-
- BookAuthor b1 = new BookAuthor(976243875L, list, "Arno Nym");
-
- ArrayList<String> list2 = new ArrayList<String>();
- BookAuthor b2 = new BookAuthor(987654321L, list2, "The Saurus");
-
- runTests(b1, b2);
- }
- }
-
- private final <T> void runTests(T... instances) {
- if (instances == null || instances.length == 0) {
- throw new IllegalArgumentException();
- }
-
- @SuppressWarnings("unchecked")
- Class<T> clazz = (Class<T>) instances[0].getClass();
-
- AvroSerializer<T> serializer = createSerializer(clazz);
- SerializerTestInstance<T> test = new SerializerTestInstance<T>(serializer, clazz, -1, instances);
- test.testAll();
- }
-
- private final <T> AvroSerializer<T> createSerializer(Class<T> type) {
- return new AvroSerializer<T>(type);
- }
-
- // --------------------------------------------------------------------------------------------
- // Test Objects
- // --------------------------------------------------------------------------------------------
-
-
- public static final class SimpleTypes {
-
- private final int iVal;
- private final long lVal;
- private final byte bVal;
- private final String sVal;
- private final short rVal;
- private final double dVal;
-
-
- public SimpleTypes() {
- this(0, 0, (byte) 0, "", (short) 0, 0);
- }
-
- public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
- this.iVal = iVal;
- this.lVal = lVal;
- this.bVal = bVal;
- this.sVal = sVal;
- this.rVal = rVal;
- this.dVal = dVal;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == SimpleTypes.class) {
- SimpleTypes other = (SimpleTypes) obj;
-
- return other.iVal == this.iVal &&
- other.lVal == this.lVal &&
- other.bVal == this.bVal &&
- other.sVal.equals(this.sVal) &&
- other.rVal == this.rVal &&
- other.dVal == this.dVal;
-
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return String.format("(%d, %d, %d, %s, %d, %f)", iVal, lVal, bVal, sVal, rVal, dVal);
- }
- }
-
- public static class ComplexNestedObject1 {
-
- private double doubleValue;
-
- private List<String> stringList;
-
- public ComplexNestedObject1() {
- }
-
- public ComplexNestedObject1(int offInit) {
- this.doubleValue = 6293485.6723 + offInit;
-
- this.stringList = new ArrayList<String>();
- this.stringList.add("A" + offInit);
- this.stringList.add("somewhat" + offInit);
- this.stringList.add("random" + offInit);
- this.stringList.add("collection" + offInit);
- this.stringList.add("of" + offInit);
- this.stringList.add("strings" + offInit);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == ComplexNestedObject1.class) {
- ComplexNestedObject1 other = (ComplexNestedObject1) obj;
- return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
- } else {
- return false;
- }
- }
- }
-
- public static class ComplexNestedObject2 {
-
- private long longValue;
-
- private Map<String, ComplexNestedObject1> theMap = new HashMap<String, ComplexNestedObject1>();
-
- public ComplexNestedObject2() {
- }
-
- public ComplexNestedObject2(Random rnd) {
- this.longValue = rnd.nextLong();
-
- this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
- this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
- this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
- this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
- this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
- this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == ComplexNestedObject2.class) {
- ComplexNestedObject2 other = (ComplexNestedObject2) obj;
- return other.longValue == this.longValue && this.theMap.equals(other.theMap);
- } else {
- return false;
- }
- }
- }
-
- public static class Book {
-
- private long bookId;
- private String title;
- private long authorId;
-
- public Book() {
- }
-
- public Book(long bookId, String title, long authorId) {
- this.bookId = bookId;
- this.title = title;
- this.authorId = authorId;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == Book.class) {
- Book other = (Book) obj;
- return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
- } else {
- return false;
- }
- }
- }
-
- public static class BookAuthor {
-
- private long authorId;
- private List<String> bookTitles;
- private String authorName;
-
- public BookAuthor() {
- }
-
- public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
- this.authorId = authorId;
- this.bookTitles = bookTitles;
- this.authorName = authorName;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == BookAuthor.class) {
- BookAuthor other = (BookAuthor) obj;
- return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
- other.bookTitles.equals(this.bookTitles);
- } else {
- return false;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
new file mode 100644
index 0000000..b7fe093
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class KryoGenericArraySerializerTest extends AbstractGenericArraySerializerTest {
+ @Override
+ protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) {
+ return new KryoSerializer<T>(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
new file mode 100644
index 0000000..f87d982
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest {
+ @Override
+ protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+ return new KryoSerializer<T>(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
new file mode 100644
index 0000000..c4f2a65
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
+ @Override
+ protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+ return new KryoSerializer<T>(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
index fa7653f..3850690 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
@@ -26,14 +26,13 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.Book;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.BookAuthor;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.ComplexNestedObject1;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.ComplexNestedObject2;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.SimpleTypes;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.Book;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.BookAuthor;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject1;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject2;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.SimpleTypes;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
[2/2] git commit: Default generic serializer is Avro. Minor
improvements in KryoSerializer. Add mini benchmark for Avro/Kryo.
Posted by se...@apache.org.
Default generic serializer is Avro. Minor improvements in KryoSerializer. Add mini benchmark for Avro/Kryo.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dac281f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dac281f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dac281f4
Branch: refs/heads/master
Commit: dac281f407a15020620ca74bc56303b3a4c3c850
Parents: 22203e7
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 2 17:56:12 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 2 18:26:20 2014 +0200
----------------------------------------------------------------------
.../api/java/typeutils/GenericTypeInfo.java | 4 +-
.../java/typeutils/runtime/KryoSerializer.java | 30 +-
.../java/typeutils/runtime/TupleComparator.java | 1 +
.../runtime/KryoVersusAvroMinibenchmark.java | 767 +++++++++++++++++++
4 files changed, 791 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dac281f4/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index f7f78c1..cbc36ed 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.java.typeutils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
import org.apache.flink.types.TypeInformation;
@@ -64,7 +64,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
@Override
public TypeSerializer<T> createSerializer() {
- return new KryoSerializer<T>(this.typeClass);
+ return new AvroSerializer<T>(this.typeClass);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dac281f4/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index ad7f83a..aa1e98e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -34,8 +34,13 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
private final Class<? extends T> typeToInstantiate;
private transient Kryo kryo;
- private transient T copyInstance = null;
- private transient Input in = null;
+ private transient T copyInstance;
+
+ private transient DataOutputView previousOut;
+ private transient DataInputView previousIn;
+
+ private transient Input input;
+ private transient Output output;
public KryoSerializer(Class<T> type){
this(type,type);
@@ -84,18 +89,25 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
@Override
public void serialize(T record, DataOutputView target) throws IOException {
checkKryoInitialized();
- DataOutputViewStream outputStream = new DataOutputViewStream(target);
- Output out = new Output(outputStream);
- kryo.writeObject(out, record);
- out.flush();
+ if (target != previousOut) {
+ DataOutputViewStream outputStream = new DataOutputViewStream(target);
+ output = new Output(outputStream);
+ previousOut = target;
+ }
+
+ kryo.writeObject(output, record);
+ output.flush();
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
checkKryoInitialized();
- DataInputViewStream inputStream = new DataInputViewStream(source);
- Input in = new NoFetchingInput(inputStream);
- reuse = kryo.readObject(in, typeToInstantiate);
+ if (source != previousIn) {
+ DataInputViewStream inputStream = new DataInputViewStream(source);
+ input = new NoFetchingInput(inputStream);
+ previousIn = source;
+ }
+ reuse = kryo.readObject(input, typeToInstantiate);
return reuse;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dac281f4/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index c786345..e7dd25a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -235,6 +235,7 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
try {
for (; i < keyPositions.length; i++) {
int keyPos = keyPositions[i];
+ @SuppressWarnings("unchecked")
int cmp = comparators[i].compare((T)first.getField(keyPos), (T)second.getField(keyPos));
if (cmp != 0) {
return cmp;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dac281f4/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
new file mode 100644
index 0000000..52c17d6
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
@@ -0,0 +1,767 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemoryUtils;
+
+public class KryoVersusAvroMinibenchmark {
+
+ private static final long SEED = 94762389741692387L;
+
+ private static final Random rnd = new Random(SEED);
+
+ private static final int NUM_ELEMENTS = 100000;
+
+ private static final int NUM_RUNS = 10;
+
+
+
+ public static void main(String[] args) throws Exception {
+
+ final MyType[] elements = new MyType[NUM_ELEMENTS];
+ for (int i = 0; i < NUM_ELEMENTS; i++) {
+ elements[i] = MyType.getRandom();
+ }
+
+ final MyType dummy = new MyType();
+
+ long[] timesAvro = new long[NUM_RUNS];
+ long[] timesKryo = new long[NUM_RUNS];
+
+ for (int i = 0; i < NUM_RUNS; i++) {
+ System.out.println("----------------- Starting run " + i + " ---------------------");
+
+ System.out.println("Avro serializer");
+ {
+ final DataOutputSerializer outView = new DataOutputSerializer(100000000);
+ final AvroSerializer<MyType> serializer = new AvroSerializer<MyType>(MyType.class);
+
+ long start = System.nanoTime();
+
+ for (int k = 0; k < NUM_ELEMENTS; k++) {
+ serializer.serialize(elements[k], outView);
+ }
+
+ final DataInputDeserializer inView = new DataInputDeserializer(outView.wrapAsByteBuffer());
+ for (int k = 0; k < NUM_ELEMENTS; k++) {
+ serializer.deserialize(dummy, inView);
+ }
+
+ long elapsed = System.nanoTime() - start;
+ System.out.println("Took: " + (elapsed / 1000000) + " msecs");
+ timesAvro[i] = elapsed;
+ }
+
+ System.gc();
+
+ System.out.println("Kryo serializer");
+ {
+ final DataOutputSerializer outView = new DataOutputSerializer(100000000);
+ final KryoSerializer<MyType> serializer = new KryoSerializer<MyType>(MyType.class);
+
+ long start = System.nanoTime();
+
+ for (int k = 0; k < NUM_ELEMENTS; k++) {
+ serializer.serialize(elements[k], outView);
+ }
+
+ final DataInputDeserializer inView = new DataInputDeserializer(outView.wrapAsByteBuffer());
+ for (int k = 0; k < NUM_ELEMENTS; k++) {
+ serializer.deserialize(dummy, inView);
+ }
+
+ long elapsed = System.nanoTime() - start;
+ System.out.println("Took: " + (elapsed / 1000000) + " msecs");
+ timesKryo[i] = elapsed;
+ }
+ }
+ }
+
+
+
+
+
+ public static class MyType {
+
+ private String theString;
+
+// private Tuple2<Long, Double> theTuple;
+
+ private List<Integer> theList;
+
+
+ public MyType() {
+ theString = "";
+// theTuple = new Tuple2<Long, Double>(0L, 0.0);
+ theList = new ArrayList<Integer>();
+ }
+
+ public MyType(String theString, Tuple2<Long, Double> theTuple, List<Integer> theList) {
+ this.theString = theString;
+// this.theTuple = theTuple;
+ this.theList = theList;
+ }
+
+
+ public String getTheString() {
+ return theString;
+ }
+
+ public void setTheString(String theString) {
+ this.theString = theString;
+ }
+
+// public Tuple2<Long, Double> getTheTuple() {
+// return theTuple;
+// }
+//
+// public void setTheTuple(Tuple2<Long, Double> theTuple) {
+// this.theTuple = theTuple;
+// }
+
+ public List<Integer> getTheList() {
+ return theList;
+ }
+
+ public void setTheList(List<Integer> theList) {
+ this.theList = theList;
+ }
+
+
+ public static MyType getRandom() {
+ final int numListElements = rnd.nextInt(20);
+ List<Integer> list = new ArrayList<Integer>(numListElements);
+ for (int i = 0; i < numListElements; i++) {
+ list.add(rnd.nextInt());
+ }
+
+ return new MyType(randomString(), new Tuple2<Long, Double>(rnd.nextLong(), rnd.nextDouble()), list);
+ }
+ }
+
+
+ private static String randomString() {
+ final int len = rnd.nextInt(100) + 20;
+
+ StringBuilder bld = new StringBuilder();
+ for (int i = 0; i < len; i++) {
+ bld.append(rnd.nextInt('z' - 'a' + 1) + 'a');
+ }
+ return bld.toString();
+ }
+
+ // ============================================================================================
+ // ============================================================================================
+
+ public static final class DataOutputSerializer implements DataOutputView {
+
+ private byte[] buffer;
+
+ private int position;
+
+ private ByteBuffer wrapper;
+
+ public DataOutputSerializer(int startSize) {
+ if (startSize < 1) {
+ throw new IllegalArgumentException();
+ }
+
+ this.buffer = new byte[startSize];
+ this.wrapper = ByteBuffer.wrap(buffer);
+ }
+
+ public ByteBuffer wrapAsByteBuffer() {
+ this.wrapper.position(0);
+ this.wrapper.limit(this.position);
+ return this.wrapper;
+ }
+
+ public void clear() {
+ this.position = 0;
+ }
+
+ public int length() {
+ return this.position;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[pos=%d cap=%d]", this.position, this.buffer.length);
+ }
+
+ // ----------------------------------------------------------------------------------------
+ // Data Output
+ // ----------------------------------------------------------------------------------------
+
+ @Override
+ public void write(int b) throws IOException {
+ if (this.position >= this.buffer.length) {
+ resize(1);
+ }
+ this.buffer[this.position++] = (byte) (b & 0xff);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ if (this.position > this.buffer.length - len) {
+ resize(len);
+ }
+ System.arraycopy(b, off, this.buffer, this.position, len);
+ this.position += len;
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ write(v ? 1 : 0);
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ write(v);
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ final int sLen = s.length();
+ if (this.position >= this.buffer.length - sLen) {
+ resize(sLen);
+ }
+
+ for (int i = 0; i < sLen; i++) {
+ writeByte(s.charAt(i));
+ }
+ this.position += sLen;
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ if (this.position >= this.buffer.length - 1) {
+ resize(2);
+ }
+ this.buffer[this.position++] = (byte) (v >> 8);
+ this.buffer[this.position++] = (byte) v;
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ final int sLen = s.length();
+ if (this.position >= this.buffer.length - 2*sLen) {
+ resize(2*sLen);
+ }
+ for (int i = 0; i < sLen; i++) {
+ writeChar(s.charAt(i));
+ }
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ @SuppressWarnings("restriction")
+ @Override
+ public void writeInt(int v) throws IOException {
+ if (this.position >= this.buffer.length - 3) {
+ resize(4);
+ }
+ if (LITTLE_ENDIAN) {
+ v = Integer.reverseBytes(v);
+ }
+ UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
+ this.position += 4;
+ }
+
+ @SuppressWarnings("restriction")
+ @Override
+ public void writeLong(long v) throws IOException {
+ if (this.position >= this.buffer.length - 7) {
+ resize(8);
+ }
+ if (LITTLE_ENDIAN) {
+ v = Long.reverseBytes(v);
+ }
+ UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
+ this.position += 8;
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ if (this.position >= this.buffer.length - 1) {
+ resize(2);
+ }
+ this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
+ this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
+ }
+
+ @Override
+ public void writeUTF(String str) throws IOException {
+ int strlen = str.length();
+ int utflen = 0;
+ int c;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ if (utflen > 65535) {
+ throw new UTFDataFormatException("Encoded string is too long: " + utflen);
+ }
+ else if (this.position > this.buffer.length - utflen - 2) {
+ resize(utflen + 2);
+ }
+
+ byte[] bytearr = this.buffer;
+ int count = this.position;
+
+ bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+ bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+
+ int i = 0;
+ for (i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
+ break;
+ }
+ bytearr[count++] = (byte) c;
+ }
+
+ for (; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ bytearr[count++] = (byte) c;
+
+ } else if (c > 0x07FF) {
+ bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ } else {
+ bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+
+ this.position = count;
+ }
+
+
+ private final void resize(int minCapacityAdd) throws IOException {
+ try {
+ final int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd);
+ final byte[] nb = new byte[newLen];
+ System.arraycopy(this.buffer, 0, nb, 0, this.position);
+ this.buffer = nb;
+ this.wrapper = ByteBuffer.wrap(this.buffer);
+ }
+ catch (NegativeArraySizeException nasex) {
+ throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java).");
+ }
+ }
+
+ @SuppressWarnings("restriction")
+ private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+ @SuppressWarnings("restriction")
+ private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+
+ private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+
+ @Override
+ public void skipBytesToWrite(int numBytes) throws IOException {
+ if(buffer.length - this.position < numBytes){
+ throw new EOFException("Could not skip " + numBytes + " bytes.");
+ }
+
+ this.position += numBytes;
+ }
+
+ @Override
+ public void write(DataInputView source, int numBytes) throws IOException {
+ if(buffer.length - this.position < numBytes){
+ throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
+ }
+
+ source.read(this.buffer, this.position, numBytes);
+ this.position += numBytes;
+ }
+
+ }
+
+ public static final class DataInputDeserializer implements DataInputView {
+
+ private byte[] buffer;
+
+ private int end;
+
+ private int position;
+
+ public DataInputDeserializer() {
+ }
+
+ public DataInputDeserializer(byte[] buffer, int start, int len) {
+ setBuffer(buffer, start, len);
+ }
+
+ public DataInputDeserializer(ByteBuffer buffer) {
+ setBuffer(buffer);
+ }
+
+ public void setBuffer(ByteBuffer buffer) {
+ if (buffer.hasArray()) {
+ this.buffer = buffer.array();
+ this.position = buffer.arrayOffset() + buffer.position();
+ this.end = this.position + buffer.remaining();
+ } else if (buffer.isDirect()) {
+ this.buffer = new byte[buffer.remaining()];
+ this.position = 0;
+ this.end = this.buffer.length;
+
+ buffer.get(this.buffer);
+ } else {
+ throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
+ }
+ }
+
+ public void setBuffer(byte[] buffer, int start, int len) {
+ if (buffer == null) {
+ throw new NullPointerException();
+ }
+
+ if (start < 0 || len < 0 || start + len >= buffer.length) {
+ throw new IllegalArgumentException();
+ }
+
+ this.buffer = buffer;
+ this.position = start;
+ this.end = start * len;
+ }
+
+ // ----------------------------------------------------------------------------------------
+ // Data Input
+ // ----------------------------------------------------------------------------------------
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ if (this.position < this.end) {
+ return this.buffer[this.position++] != 0;
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ if (this.position < this.end) {
+ return this.buffer[this.position++];
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ if (this.position < this.end - 1) {
+ return (char) (((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0));
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ if (len >= 0) {
+ if (off <= b.length - len) {
+ if (this.position <= this.end - len) {
+ System.arraycopy(this.buffer, position, b, off, len);
+ position += len;
+ } else {
+ throw new EOFException();
+ }
+ } else {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ } else if (len < 0) {
+ throw new IllegalArgumentException("Length may not be negative.");
+ }
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ if (this.position >= 0 && this.position < this.end - 3) {
+ @SuppressWarnings("restriction")
+ int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
+ if (LITTLE_ENDIAN) {
+ value = Integer.reverseBytes(value);
+ }
+
+ this.position += 4;
+ return value;
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ if (this.position < this.end) {
+ // read until a newline is found
+ StringBuilder bld = new StringBuilder();
+ char curr = (char) readUnsignedByte();
+ while (position < this.end && curr != '\n') {
+ bld.append(curr);
+ curr = (char) readUnsignedByte();
+ }
+ // trim a trailing carriage return
+ int len = bld.length();
+ if (len > 0 && bld.charAt(len - 1) == '\r') {
+ bld.setLength(len - 1);
+ }
+ String s = bld.toString();
+ bld.setLength(0);
+ return s;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ if (position >= 0 && position < this.end - 7) {
+ @SuppressWarnings("restriction")
+ long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
+ if (LITTLE_ENDIAN) {
+ value = Long.reverseBytes(value);
+ }
+ this.position += 8;
+ return value;
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ if (position >= 0 && position < this.end - 1) {
+ return (short) ((((this.buffer[position++]) & 0xff) << 8) | (((this.buffer[position++]) & 0xff) << 0));
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ int utflen = readUnsignedShort();
+ byte[] bytearr = new byte[utflen];
+ char[] chararr = new char[utflen];
+
+ int c, char2, char3;
+ int count = 0;
+ int chararr_count = 0;
+
+ readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ count++;
+ chararr[chararr_count++] = (char) c;
+ }
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ chararr[chararr_count++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx */
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 2];
+ char3 = (int) bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+ }
+ chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararr_count);
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ if (this.position < this.end) {
+ return (this.buffer[this.position++] & 0xff);
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ if (this.position < this.end - 1) {
+ return ((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0);
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ if (this.position <= this.end - n) {
+ this.position += n;
+ return n;
+ } else {
+ n = this.end - this.position;
+ this.position = this.end;
+ return n;
+ }
+ }
+
+ @SuppressWarnings("restriction")
+ private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+ @SuppressWarnings("restriction")
+ private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+
+ private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ int skippedBytes = skipBytes(numBytes);
+
+ if(skippedBytes < numBytes){
+ throw new EOFException("Could not skip " + numBytes +" bytes.");
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if(b == null){
+ throw new NullPointerException("Byte array b cannot be null.");
+ }
+
+ if(off < 0){
+ throw new IndexOutOfBoundsException("Offset cannot be negative.");
+ }
+
+ if(len < 0){
+ throw new IndexOutOfBoundsException("Length cannot be negative.");
+ }
+
+ if(b.length - off < len){
+ throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
+ ".");
+ }
+
+ if(this.position >= this.end){
+ return -1;
+ }else{
+ int toRead = Math.min(this.end-this.position, len);
+ System.arraycopy(this.buffer,this.position,b,off,toRead);
+ this.position += toRead;
+
+ return toRead;
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+ }
+}