You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/29 13:22:36 UTC
[03/13] flink git commit: [FLINK-2920] [tests] Port
KryoVersusAvroMinibenchmark to JMH.
[FLINK-2920] [tests] Port KryoVersusAvroMinibenchmark to JMH.
This closes #1302
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8f1be92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8f1be92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8f1be92
Branch: refs/heads/master
Commit: a8f1be923a4c4954fb0ae71a9c0ee672aaf2d797
Parents: 3abbcd1
Author: gallenvara <ga...@126.com>
Authored: Tue Oct 27 11:27:03 2015 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 29 10:32:21 2015 +0100
----------------------------------------------------------------------
flink-benchmark/pom.xml | 173 +++---
.../kryo/KryoVersusAvroMinibenchmark.java | 518 ++++++++++++++++++
.../kryo/KryoVersusAvroMinibenchmark.java | 522 -------------------
3 files changed, 604 insertions(+), 609 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a8f1be92/flink-benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/flink-benchmark/pom.xml b/flink-benchmark/pom.xml
index 6b2c13d..c60d522 100644
--- a/flink-benchmark/pom.xml
+++ b/flink-benchmark/pom.xml
@@ -16,99 +16,98 @@ software distributed under the License is distributed on an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-parent</artifactId>
- <version>1.0-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
- <artifactId>flink-benchmark</artifactId>
- <name>flink-benchmark</name>
- <packaging>jar</packaging>
+ <artifactId>flink-benchmark</artifactId>
+ <name>flink-benchmark</name>
+ <packaging>jar</packaging>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <jmh.version>1.11</jmh.version>
- <uberjar.name>benchmarks</uberjar.name>
- </properties>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <jmh.version>1.11</jmh.version>
+ <uberjar.name>benchmarks</uberjar.name>
+ </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.openjdk.jmh</groupId>
- <artifactId>jmh-core</artifactId>
- <version>${jmh.version}</version>
- </dependency>
- <dependency>
- <groupId>org.openjdk.jmh</groupId>
- <artifactId>jmh-generator-annprocess</artifactId>
- <version>${jmh.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime</artifactId>
- <version>1.0-SNAPSHOT</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <version>${jmh.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <finalName>${uberjar.name}</finalName>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.openjdk.jmh.Main</mainClass>
- </transformer>
- </transformers>
- <filters>
- <filter>
- <!--
- Shading signed JARs will fail without this.
- http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
- -->
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${uberjar.name}</finalName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.openjdk.jmh.Main</mainClass>
+ </transformer>
+ </transformers>
+ <filters>
+ <filter>
+ <!--
+ Shading signed JARs will fail without this.
+ http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
+ -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/a8f1be92/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java
new file mode 100644
index 0000000..98af91e
--- /dev/null
+++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.api.java.typeutils.runtime.kryo;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
+import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemoryUtils;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class KryoVersusAvroMinibenchmark {
+
+ private static final long SEED = 94762389741692387L;
+
+ private static final Random rnd = new Random(SEED);
+
+ private static final int NUM_ELEMENTS = 100000;
+
+ @Param({"1","2","3","4","5","6","7","8","9","10"})
+ private static int runTime;
+
+ private MyType[] elements ;
+
+ private MyType dummy;
+
+ @Setup
+ public void init() {
+ this.elements = new MyType[NUM_ELEMENTS];
+ for (int i = 0; i < NUM_ELEMENTS; i++) {
+ this.elements[i] = MyType.getRandom();
+ }
+ this.dummy = new MyType();
+ }
+
+ @Benchmark
+ public void avroSerializer() throws IOException {
+ final TestDataOutputSerializer outView = new TestDataOutputSerializer(10000000);
+ final AvroSerializer<MyType> serializer = new AvroSerializer<MyType>(MyType.class);
+
+ for (int k = 0; k < NUM_ELEMENTS; k++) {
+ serializer.serialize(elements[k], outView);
+ }
+
+ final DataInputDeserializer inView = new DataInputDeserializer(outView.wrapAsByteBuffer());
+ for (int k = 0; k < NUM_ELEMENTS; k++) {
+ serializer.deserialize(dummy, inView);
+ }
+ }
+
+ @Benchmark
+ public void kryoSerializer() throws IOException {
+ final TestDataOutputSerializer outView = new TestDataOutputSerializer(10000000);
+ ExecutionConfig conf = new ExecutionConfig();
+ conf.registerKryoType(MyType.class);
+ conf.enableForceKryo();
+ TypeInformation<MyType> typeInfo = new GenericTypeInfo<MyType>(MyType.class);
+ final TypeSerializer<MyType> serializer = typeInfo.createSerializer(conf);
+
+ for (int k = 0; k < NUM_ELEMENTS; k++) {
+ serializer.serialize(elements[k], outView);
+ }
+
+ final DataInputDeserializer inView = new DataInputDeserializer(outView.wrapAsByteBuffer());
+ for (int k = 0; k < NUM_ELEMENTS; k++) {
+ serializer.deserialize(dummy, inView);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ Options opt = new OptionsBuilder()
+ .include(KryoVersusAvroMinibenchmark.class.getSimpleName())
+ .warmupIterations(2)
+ .measurementIterations(2)
+ .forks(1)
+ .build();
+ new Runner(opt).run();
+ }
+
+ public static class MyType {
+
+ private String theString;
+
+// private Tuple2<Long, Double> theTuple;
+
+ private List<Integer> theList;
+
+
+ public MyType() {
+ theString = "";
+// theTuple = new Tuple2<Long, Double>(0L, 0.0);
+ theList = new ArrayList<Integer>();
+ }
+
+ public MyType(String theString, Tuple2<Long, Double> theTuple, List<Integer> theList) {
+ this.theString = theString;
+// this.theTuple = theTuple;
+ this.theList = theList;
+ }
+
+
+ public String getTheString() {
+ return theString;
+ }
+
+ public void setTheString(String theString) {
+ this.theString = theString;
+ }
+
+// public Tuple2<Long, Double> getTheTuple() {
+// return theTuple;
+// }
+//
+// public void setTheTuple(Tuple2<Long, Double> theTuple) {
+// this.theTuple = theTuple;
+// }
+
+ public List<Integer> getTheList() {
+ return theList;
+ }
+
+ public void setTheList(List<Integer> theList) {
+ this.theList = theList;
+ }
+
+
+ public static MyType getRandom() {
+ final int numListElements = rnd.nextInt(20);
+ List<Integer> list = new ArrayList<Integer>(numListElements);
+ for (int i = 0; i < numListElements; i++) {
+ list.add(rnd.nextInt());
+ }
+
+ return new MyType(randomString(), new Tuple2<Long, Double>(rnd.nextLong(), rnd.nextDouble()), list);
+ }
+ }
+
+
+ private static String randomString() {
+ final int len = rnd.nextInt(100) + 20;
+
+ StringBuilder bld = new StringBuilder();
+ for (int i = 0; i < len; i++) {
+ bld.append(rnd.nextInt('z' - 'a' + 1) + 'a');
+ }
+ return bld.toString();
+ }
+
+ // ============================================================================================
+ // ============================================================================================
+
+ public static final class DataInputDeserializer implements DataInputView {
+
+ private byte[] buffer;
+
+ private int end;
+
+ private int position;
+
+ public DataInputDeserializer() {
+ }
+
+ public DataInputDeserializer(byte[] buffer, int start, int len) {
+ setBuffer(buffer, start, len);
+ }
+
+ public DataInputDeserializer(ByteBuffer buffer) {
+ setBuffer(buffer);
+ }
+
+ public void setBuffer(ByteBuffer buffer) {
+ if (buffer.hasArray()) {
+ this.buffer = buffer.array();
+ this.position = buffer.arrayOffset() + buffer.position();
+ this.end = this.position + buffer.remaining();
+ } else if (buffer.isDirect()) {
+ this.buffer = new byte[buffer.remaining()];
+ this.position = 0;
+ this.end = this.buffer.length;
+
+ buffer.get(this.buffer);
+ } else {
+ throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
+ }
+ }
+
+ public void setBuffer(byte[] buffer, int start, int len) {
+ if (buffer == null) {
+ throw new NullPointerException();
+ }
+
+ if (start < 0 || len < 0 || start + len >= buffer.length) {
+ throw new IllegalArgumentException();
+ }
+
+ this.buffer = buffer;
+ this.position = start;
+ this.end = start * len;
+ }
+
+ // ----------------------------------------------------------------------------------------
+ // Data Input
+ // ----------------------------------------------------------------------------------------
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ if (this.position < this.end) {
+ return this.buffer[this.position++] != 0;
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ if (this.position < this.end) {
+ return this.buffer[this.position++];
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ if (this.position < this.end - 1) {
+ return (char) (((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0));
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ if (len >= 0) {
+ if (off <= b.length - len) {
+ if (this.position <= this.end - len) {
+ System.arraycopy(this.buffer, position, b, off, len);
+ position += len;
+ } else {
+ throw new EOFException();
+ }
+ } else {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ } else if (len < 0) {
+ throw new IllegalArgumentException("Length may not be negative.");
+ }
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ if (this.position >= 0 && this.position < this.end - 3) {
+ @SuppressWarnings("restriction")
+ int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
+ if (LITTLE_ENDIAN) {
+ value = Integer.reverseBytes(value);
+ }
+
+ this.position += 4;
+ return value;
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ if (this.position < this.end) {
+ // read until a newline is found
+ StringBuilder bld = new StringBuilder();
+ char curr = (char) readUnsignedByte();
+ while (position < this.end && curr != '\n') {
+ bld.append(curr);
+ curr = (char) readUnsignedByte();
+ }
+ // trim a trailing carriage return
+ int len = bld.length();
+ if (len > 0 && bld.charAt(len - 1) == '\r') {
+ bld.setLength(len - 1);
+ }
+ String s = bld.toString();
+ bld.setLength(0);
+ return s;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ if (position >= 0 && position < this.end - 7) {
+ @SuppressWarnings("restriction")
+ long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
+ if (LITTLE_ENDIAN) {
+ value = Long.reverseBytes(value);
+ }
+ this.position += 8;
+ return value;
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ if (position >= 0 && position < this.end - 1) {
+ return (short) ((((this.buffer[position++]) & 0xff) << 8) | (((this.buffer[position++]) & 0xff) << 0));
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ int utflen = readUnsignedShort();
+ byte[] bytearr = new byte[utflen];
+ char[] chararr = new char[utflen];
+
+ int c, char2, char3;
+ int count = 0;
+ int chararr_count = 0;
+
+ readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ count++;
+ chararr[chararr_count++] = (char) c;
+ }
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ chararr[chararr_count++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx */
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 2];
+ char3 = (int) bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+ }
+ chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararr_count);
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ if (this.position < this.end) {
+ return (this.buffer[this.position++] & 0xff);
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ if (this.position < this.end - 1) {
+ return ((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0);
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ if (this.position <= this.end - n) {
+ this.position += n;
+ return n;
+ } else {
+ n = this.end - this.position;
+ this.position = this.end;
+ return n;
+ }
+ }
+
+ @SuppressWarnings("restriction")
+ private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+ @SuppressWarnings("restriction")
+ private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+
+ private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ int skippedBytes = skipBytes(numBytes);
+
+ if (skippedBytes < numBytes){
+ throw new EOFException("Could not skip " + numBytes +" bytes.");
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException("Byte array b cannot be null.");
+ }
+
+ if (off < 0) {
+ throw new IndexOutOfBoundsException("Offset cannot be negative.");
+ }
+
+ if (len < 0) {
+ throw new IndexOutOfBoundsException("Length cannot be negative.");
+ }
+
+ if (b.length - off < len) {
+ throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
+ ".");
+ }
+
+ if (this.position >= this.end) {
+ return -1;
+ } else {
+ int toRead = Math.min(this.end-this.position, len);
+ System.arraycopy(this.buffer,this.position,b,off,toRead);
+ this.position += toRead;
+
+ return toRead;
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a8f1be92/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java
deleted file mode 100644
index 99cfe79..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
-import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemoryUtils;
-
-public class KryoVersusAvroMinibenchmark {
-
- private static final long SEED = 94762389741692387L;
-
- private static final Random rnd = new Random(SEED);
-
- private static final int NUM_ELEMENTS = 100000;
-
- private static final int NUM_RUNS = 10;
-
-
-
- public static void main(String[] args) throws Exception {
-
- final MyType[] elements = new MyType[NUM_ELEMENTS];
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- elements[i] = MyType.getRandom();
- }
-
- final MyType dummy = new MyType();
-
- long[] timesAvro = new long[NUM_RUNS];
- long[] timesKryo = new long[NUM_RUNS];
-
- for (int i = 0; i < NUM_RUNS; i++) {
- System.out.println("----------------- Starting run " + i + " ---------------------");
-
- System.out.println("Avro serializer");
- {
- final TestDataOutputSerializer outView = new TestDataOutputSerializer(100000000);
- final AvroSerializer<MyType> serializer = new AvroSerializer<MyType>(MyType.class);
-
- long start = System.nanoTime();
-
- for (int k = 0; k < NUM_ELEMENTS; k++) {
- serializer.serialize(elements[k], outView);
- }
-
- final DataInputDeserializer inView = new DataInputDeserializer(outView.wrapAsByteBuffer());
- for (int k = 0; k < NUM_ELEMENTS; k++) {
- serializer.deserialize(dummy, inView);
- }
-
- long elapsed = System.nanoTime() - start;
- System.out.println("Took: " + (elapsed / 1000000) + " msecs");
- timesAvro[i] = elapsed;
- }
-
- System.gc();
-
- System.out.println("Kryo serializer");
- {
- final TestDataOutputSerializer outView = new TestDataOutputSerializer(100000000);
- ExecutionConfig conf = new ExecutionConfig();
- conf.registerKryoType(MyType.class);
- conf.enableForceKryo();
- TypeInformation<MyType> typeInfo = new GenericTypeInfo<MyType>(MyType.class);
- final TypeSerializer<MyType> serializer = typeInfo.createSerializer(conf);
-
- long start = System.nanoTime();
-
- for (int k = 0; k < NUM_ELEMENTS; k++) {
- serializer.serialize(elements[k], outView);
- }
-
- final DataInputDeserializer inView = new DataInputDeserializer(outView.wrapAsByteBuffer());
- for (int k = 0; k < NUM_ELEMENTS; k++) {
- serializer.deserialize(dummy, inView);
- }
-
- long elapsed = System.nanoTime() - start;
- System.out.println("Took: " + (elapsed / 1000000) + " msecs");
- timesKryo[i] = elapsed;
- }
- }
- }
-
-
-
-
-
- public static class MyType {
-
- private String theString;
-
-// private Tuple2<Long, Double> theTuple;
-
- private List<Integer> theList;
-
-
- public MyType() {
- theString = "";
-// theTuple = new Tuple2<Long, Double>(0L, 0.0);
- theList = new ArrayList<Integer>();
- }
-
- public MyType(String theString, Tuple2<Long, Double> theTuple, List<Integer> theList) {
- this.theString = theString;
-// this.theTuple = theTuple;
- this.theList = theList;
- }
-
-
- public String getTheString() {
- return theString;
- }
-
- public void setTheString(String theString) {
- this.theString = theString;
- }
-
-// public Tuple2<Long, Double> getTheTuple() {
-// return theTuple;
-// }
-//
-// public void setTheTuple(Tuple2<Long, Double> theTuple) {
-// this.theTuple = theTuple;
-// }
-
- public List<Integer> getTheList() {
- return theList;
- }
-
- public void setTheList(List<Integer> theList) {
- this.theList = theList;
- }
-
-
- public static MyType getRandom() {
- final int numListElements = rnd.nextInt(20);
- List<Integer> list = new ArrayList<Integer>(numListElements);
- for (int i = 0; i < numListElements; i++) {
- list.add(rnd.nextInt());
- }
-
- return new MyType(randomString(), new Tuple2<Long, Double>(rnd.nextLong(), rnd.nextDouble()), list);
- }
- }
-
-
- private static String randomString() {
- final int len = rnd.nextInt(100) + 20;
-
- StringBuilder bld = new StringBuilder();
- for (int i = 0; i < len; i++) {
- bld.append(rnd.nextInt('z' - 'a' + 1) + 'a');
- }
- return bld.toString();
- }
-
- // ============================================================================================
- // ============================================================================================
-
- public static final class DataInputDeserializer implements DataInputView {
-
- private byte[] buffer;
-
- private int end;
-
- private int position;
-
- public DataInputDeserializer() {
- }
-
- public DataInputDeserializer(byte[] buffer, int start, int len) {
- setBuffer(buffer, start, len);
- }
-
- public DataInputDeserializer(ByteBuffer buffer) {
- setBuffer(buffer);
- }
-
- public void setBuffer(ByteBuffer buffer) {
- if (buffer.hasArray()) {
- this.buffer = buffer.array();
- this.position = buffer.arrayOffset() + buffer.position();
- this.end = this.position + buffer.remaining();
- } else if (buffer.isDirect()) {
- this.buffer = new byte[buffer.remaining()];
- this.position = 0;
- this.end = this.buffer.length;
-
- buffer.get(this.buffer);
- } else {
- throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
- }
- }
-
- public void setBuffer(byte[] buffer, int start, int len) {
- if (buffer == null) {
- throw new NullPointerException();
- }
-
- if (start < 0 || len < 0 || start + len >= buffer.length) {
- throw new IllegalArgumentException();
- }
-
- this.buffer = buffer;
- this.position = start;
- this.end = start * len;
- }
-
- // ----------------------------------------------------------------------------------------
- // Data Input
- // ----------------------------------------------------------------------------------------
-
- @Override
- public boolean readBoolean() throws IOException {
- if (this.position < this.end) {
- return this.buffer[this.position++] != 0;
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public byte readByte() throws IOException {
- if (this.position < this.end) {
- return this.buffer[this.position++];
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public char readChar() throws IOException {
- if (this.position < this.end - 1) {
- return (char) (((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0));
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public double readDouble() throws IOException {
- return Double.longBitsToDouble(readLong());
- }
-
- @Override
- public float readFloat() throws IOException {
- return Float.intBitsToFloat(readInt());
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- readFully(b, 0, b.length);
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- if (len >= 0) {
- if (off <= b.length - len) {
- if (this.position <= this.end - len) {
- System.arraycopy(this.buffer, position, b, off, len);
- position += len;
- } else {
- throw new EOFException();
- }
- } else {
- throw new ArrayIndexOutOfBoundsException();
- }
- } else if (len < 0) {
- throw new IllegalArgumentException("Length may not be negative.");
- }
- }
-
- @Override
- public int readInt() throws IOException {
- if (this.position >= 0 && this.position < this.end - 3) {
- @SuppressWarnings("restriction")
- int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
- if (LITTLE_ENDIAN) {
- value = Integer.reverseBytes(value);
- }
-
- this.position += 4;
- return value;
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public String readLine() throws IOException {
- if (this.position < this.end) {
- // read until a newline is found
- StringBuilder bld = new StringBuilder();
- char curr = (char) readUnsignedByte();
- while (position < this.end && curr != '\n') {
- bld.append(curr);
- curr = (char) readUnsignedByte();
- }
- // trim a trailing carriage return
- int len = bld.length();
- if (len > 0 && bld.charAt(len - 1) == '\r') {
- bld.setLength(len - 1);
- }
- String s = bld.toString();
- bld.setLength(0);
- return s;
- } else {
- return null;
- }
- }
-
- @Override
- public long readLong() throws IOException {
- if (position >= 0 && position < this.end - 7) {
- @SuppressWarnings("restriction")
- long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
- if (LITTLE_ENDIAN) {
- value = Long.reverseBytes(value);
- }
- this.position += 8;
- return value;
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public short readShort() throws IOException {
- if (position >= 0 && position < this.end - 1) {
- return (short) ((((this.buffer[position++]) & 0xff) << 8) | (((this.buffer[position++]) & 0xff) << 0));
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public String readUTF() throws IOException {
- int utflen = readUnsignedShort();
- byte[] bytearr = new byte[utflen];
- char[] chararr = new char[utflen];
-
- int c, char2, char3;
- int count = 0;
- int chararr_count = 0;
-
- readFully(bytearr, 0, utflen);
-
- while (count < utflen) {
- c = (int) bytearr[count] & 0xff;
- if (c > 127) {
- break;
- }
- count++;
- chararr[chararr_count++] = (char) c;
- }
-
- while (count < utflen) {
- c = (int) bytearr[count] & 0xff;
- switch (c >> 4) {
- case 0:
- case 1:
- case 2:
- case 3:
- case 4:
- case 5:
- case 6:
- case 7:
- /* 0xxxxxxx */
- count++;
- chararr[chararr_count++] = (char) c;
- break;
- case 12:
- case 13:
- /* 110x xxxx 10xx xxxx */
- count += 2;
- if (count > utflen) {
- throw new UTFDataFormatException("malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80) {
- throw new UTFDataFormatException("malformed input around byte " + count);
- }
- chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
- break;
- case 14:
- /* 1110 xxxx 10xx xxxx 10xx xxxx */
- count += 3;
- if (count > utflen) {
- throw new UTFDataFormatException("malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 2];
- char3 = (int) bytearr[count - 1];
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
- throw new UTFDataFormatException("malformed input around byte " + (count - 1));
- }
- chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
- break;
- default:
- /* 10xx xxxx, 1111 xxxx */
- throw new UTFDataFormatException("malformed input around byte " + count);
- }
- }
- // The number of chars produced may be less than utflen
- return new String(chararr, 0, chararr_count);
- }
-
- @Override
- public int readUnsignedByte() throws IOException {
- if (this.position < this.end) {
- return (this.buffer[this.position++] & 0xff);
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public int readUnsignedShort() throws IOException {
- if (this.position < this.end - 1) {
- return ((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0);
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public int skipBytes(int n) throws IOException {
- if (this.position <= this.end - n) {
- this.position += n;
- return n;
- } else {
- n = this.end - this.position;
- this.position = this.end;
- return n;
- }
- }
-
- @SuppressWarnings("restriction")
- private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-
- @SuppressWarnings("restriction")
- private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-
- private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
-
- @Override
- public void skipBytesToRead(int numBytes) throws IOException {
- int skippedBytes = skipBytes(numBytes);
-
- if (skippedBytes < numBytes){
- throw new EOFException("Could not skip " + numBytes +" bytes.");
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- if (b == null) {
- throw new NullPointerException("Byte array b cannot be null.");
- }
-
- if (off < 0) {
- throw new IndexOutOfBoundsException("Offset cannot be negative.");
- }
-
- if (len < 0) {
- throw new IndexOutOfBoundsException("Length cannot be negative.");
- }
-
- if (b.length - off < len) {
- throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
- ".");
- }
-
- if (this.position >= this.end) {
- return -1;
- } else {
- int toRead = Math.min(this.end-this.position, len);
- System.arraycopy(this.buffer,this.position,b,off,toRead);
- this.position += toRead;
-
- return toRead;
- }
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
- }
-}