You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/11/11 17:29:00 UTC

[4/4] flink git commit: [license] remove flink-benchmark due to licensing issues

[license] remove flink-benchmark due to licensing issues

In addition to dd66e61ecc5da5b15a610f04b98c8386d141f910,
removes the left-over source files.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2063fa12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2063fa12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2063fa12

Branch: refs/heads/master
Commit: 2063fa12d520818569e5cbb88d4123716f2da249
Parents: 6019f08
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Nov 11 17:03:06 2015 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Nov 11 17:08:35 2015 +0100

----------------------------------------------------------------------
 .../runtime/FieldAccessMinibenchmark.java       |  170 --
 .../kryo/KryoVersusAvroMinibenchmark.java       |  518 ------
 .../memory/LongSerializationSpeedBenchmark.java |  225 ---
 .../StringSerializationSpeedBenchmark.java      |  209 ---
 .../segments/CoreMemorySegmentOutView.java      |  360 ----
 .../segments/MemorySegmentSpeedBenchmark.java   | 1633 ------------------
 .../memory/segments/PureHeapMemorySegment.java  |  466 -----
 .../segments/PureHeapMemorySegmentOutView.java  |  359 ----
 .../segments/PureHybridMemorySegment.java       |  887 ----------
 .../PureHybridMemorySegmentOutView.java         |  359 ----
 .../segments/PureOffHeapMemorySegment.java      |  790 ---------
 .../PureOffHeapMemorySegmentOutView.java        |  359 ----
 .../IOManagerPerformanceBenchmark.java          |  608 -------
 .../MutableHashTablePerformanceBenchmark.java   |  359 ----
 14 files changed, 7302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2063fa12/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/FieldAccessMinibenchmark.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/FieldAccessMinibenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/FieldAccessMinibenchmark.java
deleted file mode 100644
index 0a434e9..0000000
--- a/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/FieldAccessMinibenchmark.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.benchmark.api.java.typeutils.runtime;
-
-import org.openjdk.jmh.annotations.*;
-import org.openjdk.jmh.results.RunResult;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-@State(Scope.Thread)
-@BenchmarkMode(Mode.AverageTime)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
-public class FieldAccessMinibenchmark {
-
-	private final long RUNS = 1000000000L;
-	static Field wordDescField;
-	static Field wordField;
-	static {
-		try {
-			wordDescField = WC.class.getField("wordDesc");
-			wordField = ComplexWordDescriptor.class.getField("word");
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-	}
-
-	@Setup
-	public void warmUp() throws NoSuchFieldException,IllegalAccessException{
-		WC word0 = new WC(14, "Hallo");
-		WC word1 = new WC(3, "Hola");
-		for (long i = 0; i < 100000000; i++) {
-			compareCodeGenPublicFields(word0, word1);
-			compareCodeGenMethods(word0, word1);
-			compareReflective(word0, word1);
-		}
-	}
-
-	public static class ComplexWordDescriptor {
-		public String word;
-
-		public String getWord() {
-			return word;
-		}
-	}
-
-	public static class WC {
-		public int count;
-		public ComplexWordDescriptor wordDesc;
-
-		public WC(int c, String s) throws NoSuchFieldException,
-				SecurityException {
-			this.count = c;
-			this.wordDesc = new ComplexWordDescriptor();
-			this.wordDesc.word = s;
-		}
-
-		public ComplexWordDescriptor getWordDesc() {
-			return wordDesc;
-		}
-
-	}
-
-	public static int compareCodeGenPublicFields(WC w1, WC w2) {
-		return w1.wordDesc.word.compareTo(w2.wordDesc.word);
-	}
-
-	public static int compareCodeGenMethods(WC w1, WC w2) {
-		return w1.getWordDesc().getWord().compareTo(w2.getWordDesc().getWord());
-	}
-
-	public static int compareReflective(WC w1, WC w2)
-			throws IllegalArgumentException, IllegalAccessException {
-		// get String of w1
-		Object wordDesc1 = wordDescField.get(w1);
-		String word2cmp1 = (String) wordField.get(wordDesc1);
-
-		// get String of w2
-		Object wordDesc2 = wordDescField.get(w2);
-		String word2cmp2 = (String) wordField.get(wordDesc2);
-
-		return word2cmp1.compareTo(word2cmp2);
-	}
-
-	@Benchmark
-	public void codeGenPublicFields() throws NoSuchFieldException {
-		WC word0 = new WC(14, "Hallo");
-		WC word1 = new WC(3, "Hola");
-		for (long i = 0; i < RUNS; i++) {
-			int a = compareCodeGenPublicFields(word0, word1);
-			if (a == 0) {
-				System.err.println("hah");
-			}
-		}
-	}
-
-	@Benchmark
-	public void codeGenMethods() throws NoSuchFieldException{
-		WC word0 = new WC(14, "Hallo");
-		WC word1 = new WC(3, "Hola");
-		for (long i = 0; i < RUNS; i++) {
-			int a = compareCodeGenPublicFields(word0, word1);
-			if (a == 0) {
-				System.err.println("hah");
-			}
-		}
-	}
-
-	@Benchmark
-	public void reflection() throws NoSuchFieldException,IllegalAccessException {
-		WC word0 = new WC(14, "Hallo");
-		WC word1 = new WC(3, "Hola");
-		for (long i = 0; i < RUNS; i++) {
-			int a = compareReflective(word0, word1);
-			if (a == 0) {
-				System.err.println("hah");
-			}
-		}
-	}
-
-	/**
-	 * results on Core i7 2600k
-	 *
-	 *
-	 * warming up Code gen 5019 Reflection 20364 Factor = 4.057382
-	 */
-	public static void main(String[] args) throws RunnerException {
-
-		Options opt = new OptionsBuilder()
-				.include(FieldAccessMinibenchmark.class.getSimpleName())
-				.warmupIterations(2)
-				.measurementIterations(2)
-				.forks(1)
-				.build();
-		Collection<RunResult> results = new Runner(opt).run();
-		double[] score = new double[3];
-		int count = 0;
-		final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
-		String jvm = bean.getVmName() + " - " + bean.getVmVendor() + " - "
-				+ bean.getSpecVersion() + '/' + bean.getVmVersion();
-		System.err.println("Jvm info : " + jvm);
-		for (RunResult r : results) {
-			score[count++] = r.getPrimaryResult().getScore();
-		}
-		System.err.println("Factor vs public = " + score[2] / score[1]);
-		System.err.println("Factor vs methods = " + score[2] / score[0]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2063fa12/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java
deleted file mode 100644
index 98af91e..0000000
--- a/flink-benchmark/src/test/java/org/apache/flink/benchmark/api/java/typeutils/runtime/kryo/KryoVersusAvroMinibenchmark.java
+++ /dev/null
@@ -1,518 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.benchmark.api.java.typeutils.runtime.kryo;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
-import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemoryUtils;
-import org.openjdk.jmh.annotations.*;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-
-@State(Scope.Thread)
-@BenchmarkMode(Mode.AverageTime)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
-public class KryoVersusAvroMinibenchmark {
-
-	private static final long SEED = 94762389741692387L;
-	
-	private static final Random rnd = new Random(SEED);
-	
-	private static final int NUM_ELEMENTS = 100000;
-
-	@Param({"1","2","3","4","5","6","7","8","9","10"})
-	private static int runTime;
-
-	private MyType[] elements ;
-
-	private MyType dummy;
-
-	@Setup
-	public void init() {
-		this.elements = new MyType[NUM_ELEMENTS];
-		for (int i = 0; i < NUM_ELEMENTS; i++) {
-			this.elements[i] = MyType.getRandom();
-		}
-		this.dummy = new MyType();
-	}
-
-	@Benchmark
-	public void avroSerializer() throws IOException {
-		final TestDataOutputSerializer outView = new TestDataOutputSerializer(10000000);
-		final AvroSerializer<MyType> serializer = new AvroSerializer<MyType>(MyType.class);
-
-		for (int k = 0; k < NUM_ELEMENTS; k++) {
-			serializer.serialize(elements[k], outView);
-		}
-
-		final DataInputDeserializer inView = new DataInputDeserializer(outView.wrapAsByteBuffer());
-		for (int k = 0; k < NUM_ELEMENTS; k++) {
-			serializer.deserialize(dummy, inView);
-		}
-	}
-
-	@Benchmark
-	public void kryoSerializer() throws IOException {
-		final TestDataOutputSerializer outView = new TestDataOutputSerializer(10000000);
-		ExecutionConfig conf = new ExecutionConfig();
-		conf.registerKryoType(MyType.class);
-		conf.enableForceKryo();
-		TypeInformation<MyType> typeInfo = new GenericTypeInfo<MyType>(MyType.class);
-		final TypeSerializer<MyType> serializer = typeInfo.createSerializer(conf);
-
-		for (int k = 0; k < NUM_ELEMENTS; k++) {
-			serializer.serialize(elements[k], outView);
-		}
-
-		final DataInputDeserializer inView = new DataInputDeserializer(outView.wrapAsByteBuffer());
-		for (int k = 0; k < NUM_ELEMENTS; k++) {
-			serializer.deserialize(dummy, inView);
-		}
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		Options opt = new OptionsBuilder()
-				.include(KryoVersusAvroMinibenchmark.class.getSimpleName())
-				.warmupIterations(2)
-				.measurementIterations(2)
-				.forks(1)
-				.build();
-		new Runner(opt).run();
-	}
-
-	public static class MyType {
-		
-		private String theString;
-		
-//		private Tuple2<Long, Double> theTuple;
-		
-		private List<Integer> theList;
-
-		
-		public MyType() {
-			theString = "";
-//			theTuple = new Tuple2<Long, Double>(0L, 0.0);
-			theList = new ArrayList<Integer>();
-		}
-		
-		public MyType(String theString, Tuple2<Long, Double> theTuple, List<Integer> theList) {
-			this.theString = theString;
-//			this.theTuple = theTuple;
-			this.theList = theList;
-		}
-
-		
-		public String getTheString() {
-			return theString;
-		}
-
-		public void setTheString(String theString) {
-			this.theString = theString;
-		}
-
-//		public Tuple2<Long, Double> getTheTuple() {
-//			return theTuple;
-//		}
-//
-//		public void setTheTuple(Tuple2<Long, Double> theTuple) {
-//			this.theTuple = theTuple;
-//		}
-
-		public List<Integer> getTheList() {
-			return theList;
-		}
-
-		public void setTheList(List<Integer> theList) {
-			this.theList = theList;
-		}
-		
-		
-		public static MyType getRandom() {
-			final int numListElements = rnd.nextInt(20);
-			List<Integer> list = new ArrayList<Integer>(numListElements);
-			for (int i = 0; i < numListElements; i++) {
-				list.add(rnd.nextInt());
-			}
-			
-			return new MyType(randomString(), new Tuple2<Long, Double>(rnd.nextLong(), rnd.nextDouble()), list);
-		}
-	}
-	
-	
-	private static String randomString() {
-		final int len = rnd.nextInt(100) + 20;
-		
-		StringBuilder bld = new StringBuilder();
-		for (int i = 0; i < len; i++) {
-			bld.append(rnd.nextInt('z' - 'a' + 1) + 'a');
-		}
-		return bld.toString();
-	}
-	
-	// ============================================================================================
-	// ============================================================================================
-	
-	public static final class DataInputDeserializer implements DataInputView {
-		
-		private byte[] buffer;
-		
-		private int end;
-
-		private int position;
-
-		public DataInputDeserializer() {
-		}
-		
-		public DataInputDeserializer(byte[] buffer, int start, int len) {
-			setBuffer(buffer, start, len);
-		}
-		
-		public DataInputDeserializer(ByteBuffer buffer) {
-			setBuffer(buffer);
-		}
-
-		public void setBuffer(ByteBuffer buffer) {
-			if (buffer.hasArray()) {
-				this.buffer = buffer.array();
-				this.position = buffer.arrayOffset() + buffer.position();
-				this.end = this.position + buffer.remaining();
-			} else if (buffer.isDirect()) {
-				this.buffer = new byte[buffer.remaining()];
-				this.position = 0;
-				this.end = this.buffer.length;
-
-				buffer.get(this.buffer);
-			} else {
-				throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
-			}
-		}
-
-		public void setBuffer(byte[] buffer, int start, int len) {
-			if (buffer == null) {
-				throw new NullPointerException();
-			}
-
-			if (start < 0 || len < 0 || start + len >= buffer.length) {
-				throw new IllegalArgumentException();
-			}
-
-			this.buffer = buffer;
-			this.position = start;
-			this.end = start * len;
-		}
-
-		// ----------------------------------------------------------------------------------------
-		//                               Data Input
-		// ----------------------------------------------------------------------------------------
-		
-		@Override
-		public boolean readBoolean() throws IOException {
-			if (this.position < this.end) {
-				return this.buffer[this.position++] != 0;
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		@Override
-		public byte readByte() throws IOException {
-			if (this.position < this.end) {
-				return this.buffer[this.position++];
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		@Override
-		public char readChar() throws IOException {
-			if (this.position < this.end - 1) {
-				return (char) (((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0));
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		@Override
-		public double readDouble() throws IOException {
-			return Double.longBitsToDouble(readLong());
-		}
-
-		@Override
-		public float readFloat() throws IOException {
-			return Float.intBitsToFloat(readInt());
-		}
-
-		@Override
-		public void readFully(byte[] b) throws IOException {
-			readFully(b, 0, b.length);
-		}
-
-		@Override
-		public void readFully(byte[] b, int off, int len) throws IOException {
-			if (len >= 0) {
-				if (off <= b.length - len) {
-					if (this.position <= this.end - len) {
-						System.arraycopy(this.buffer, position, b, off, len);
-						position += len;
-					} else {
-						throw new EOFException();
-					}
-				} else {
-					throw new ArrayIndexOutOfBoundsException();
-				}
-			} else if (len < 0) {
-				throw new IllegalArgumentException("Length may not be negative.");
-			}
-		}
-
-		@Override
-		public int readInt() throws IOException {
-			if (this.position >= 0 && this.position < this.end - 3) {
-				@SuppressWarnings("restriction")
-				int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
-				if (LITTLE_ENDIAN) {
-					value = Integer.reverseBytes(value);
-				}
-				
-				this.position += 4;
-				return value;
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		@Override
-		public String readLine() throws IOException {
-			if (this.position < this.end) {
-				// read until a newline is found
-				StringBuilder bld = new StringBuilder();
-				char curr = (char) readUnsignedByte();
-				while (position < this.end && curr != '\n') {
-					bld.append(curr);
-					curr = (char) readUnsignedByte();
-				}
-				// trim a trailing carriage return
-				int len = bld.length();
-				if (len > 0 && bld.charAt(len - 1) == '\r') {
-					bld.setLength(len - 1);
-				}
-				String s = bld.toString();
-				bld.setLength(0);
-				return s;
-			} else {
-				return null;
-			}
-		}
-
-		@Override
-		public long readLong() throws IOException {
-			if (position >= 0 && position < this.end - 7) {
-				@SuppressWarnings("restriction")
-				long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
-				if (LITTLE_ENDIAN) {
-					value = Long.reverseBytes(value);
-				}
-				this.position += 8;
-				return value;
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		@Override
-		public short readShort() throws IOException {
-			if (position >= 0 && position < this.end - 1) {
-				return (short) ((((this.buffer[position++]) & 0xff) << 8) | (((this.buffer[position++]) & 0xff) << 0));
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		@Override
-		public String readUTF() throws IOException {
-			int utflen = readUnsignedShort();
-			byte[] bytearr = new byte[utflen];
-			char[] chararr = new char[utflen];
-
-			int c, char2, char3;
-			int count = 0;
-			int chararr_count = 0;
-
-			readFully(bytearr, 0, utflen);
-
-			while (count < utflen) {
-				c = (int) bytearr[count] & 0xff;
-				if (c > 127) {
-					break;
-				}
-				count++;
-				chararr[chararr_count++] = (char) c;
-			}
-
-			while (count < utflen) {
-				c = (int) bytearr[count] & 0xff;
-				switch (c >> 4) {
-				case 0:
-				case 1:
-				case 2:
-				case 3:
-				case 4:
-				case 5:
-				case 6:
-				case 7:
-					/* 0xxxxxxx */
-					count++;
-					chararr[chararr_count++] = (char) c;
-					break;
-				case 12:
-				case 13:
-					/* 110x xxxx 10xx xxxx */
-					count += 2;
-					if (count > utflen) {
-						throw new UTFDataFormatException("malformed input: partial character at end");
-					}
-					char2 = (int) bytearr[count - 1];
-					if ((char2 & 0xC0) != 0x80) {
-						throw new UTFDataFormatException("malformed input around byte " + count);
-					}
-					chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
-					break;
-				case 14:
-					/* 1110 xxxx 10xx xxxx 10xx xxxx */
-					count += 3;
-					if (count > utflen) {
-						throw new UTFDataFormatException("malformed input: partial character at end");
-					}
-					char2 = (int) bytearr[count - 2];
-					char3 = (int) bytearr[count - 1];
-					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
-						throw new UTFDataFormatException("malformed input around byte " + (count - 1));
-					}
-					chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
-					break;
-				default:
-					/* 10xx xxxx, 1111 xxxx */
-					throw new UTFDataFormatException("malformed input around byte " + count);
-				}
-			}
-			// The number of chars produced may be less than utflen
-			return new String(chararr, 0, chararr_count);
-		}
-
-		@Override
-		public int readUnsignedByte() throws IOException {
-			if (this.position < this.end) {
-				return (this.buffer[this.position++] & 0xff);
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		@Override
-		public int readUnsignedShort() throws IOException {
-			if (this.position < this.end - 1) {
-				return ((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0);
-			} else {
-				throw new EOFException();
-			}
-		}
-		
-		@Override
-		public int skipBytes(int n) throws IOException {
-			if (this.position <= this.end - n) {
-				this.position += n;
-				return n;
-			} else {
-				n = this.end - this.position;
-				this.position = this.end;
-				return n;
-			}
-		}
-		
-		@SuppressWarnings("restriction")
-		private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-		
-		@SuppressWarnings("restriction")
-		private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-		
-		private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
-
-		@Override
-		public void skipBytesToRead(int numBytes) throws IOException {
-			int skippedBytes = skipBytes(numBytes);
-
-			if (skippedBytes < numBytes){
-				throw new EOFException("Could not skip " + numBytes +" bytes.");
-			}
-		}
-
-		@Override
-		public int read(byte[] b, int off, int len) throws IOException {
-			if (b == null) {
-				throw new NullPointerException("Byte array b cannot be null.");
-			}
-
-			if (off < 0) {
-				throw new IndexOutOfBoundsException("Offset cannot be negative.");
-			}
-
-			if (len < 0) {
-				throw new IndexOutOfBoundsException("Length cannot be negative.");
-			}
-
-			if (b.length - off < len) {
-				throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
-						".");
-			}
-
-			if (this.position >= this.end) {
-				return -1;
-			} else {
-				int toRead = Math.min(this.end-this.position, len);
-				System.arraycopy(this.buffer,this.position,b,off,toRead);
-				this.position += toRead;
-
-				return toRead;
-			}
-		}
-
-		@Override
-		public int read(byte[] b) throws IOException {
-			return read(b, 0, b.length);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2063fa12/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/LongSerializationSpeedBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/LongSerializationSpeedBenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/LongSerializationSpeedBenchmark.java
deleted file mode 100644
index 3e2605a..0000000
--- a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/LongSerializationSpeedBenchmark.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.benchmark.core.memory;
-
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.benchmark.core.memory.segments.CoreMemorySegmentOutView;
-import org.apache.flink.benchmark.core.memory.segments.PureHeapMemorySegment;
-import org.apache.flink.benchmark.core.memory.segments.PureHeapMemorySegmentOutView;
-import org.apache.flink.benchmark.core.memory.segments.PureHybridMemorySegment;
-import org.apache.flink.benchmark.core.memory.segments.PureHybridMemorySegmentOutView;
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.HybridMemorySegment;
-import org.apache.flink.core.memory.MemorySegment;
-import org.openjdk.jmh.annotations.*;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.concurrent.TimeUnit;
-
-@State(Scope.Benchmark)
-@BenchmarkMode(Mode.AverageTime)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
-public class LongSerializationSpeedBenchmark {
-	
-	private final static int LARGE_SEGMENT_SIZE = 1024 * 1024 * 1024;
-	
-	private final byte[] largeSegment = new byte[LARGE_SEGMENT_SIZE];
-	
-	private final static long innerRounds = LARGE_SEGMENT_SIZE / 8;
-	
-	private final static int outerRounds = 10;
-	
-	private MemorySegment coreHeap;
-	
-	private MemorySegment coreHybridOnHeap;
-	
-	private MemorySegment coreHybridOffHeap;
-	
-	private PureHeapMemorySegment pureHeap;
-	
-	private PureHybridMemorySegment pureHybridOnHeap;
-	
-	private PureHybridMemorySegment pureHybridOffHeap;
-	
-	private LongSerializer ser;
-	
-	
-	@Setup
-	public void init() {
-		final ByteBuffer largeOffHeap = ByteBuffer.allocateDirect(LARGE_SEGMENT_SIZE);
-		
-		fillOnHeap(largeSegment, (byte) -1);
-		fillOffHeap(largeOffHeap, (byte) -1);
-		
-		this.coreHeap = HeapMemorySegment.FACTORY.wrapPooledHeapMemory(largeSegment, null);
-		this.coreHybridOnHeap = HybridMemorySegment.FACTORY.wrapPooledHeapMemory(largeSegment, null);
-		this.coreHybridOffHeap = HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(largeOffHeap, null);
-		this.pureHeap = new PureHeapMemorySegment(largeSegment);
-		this.pureHybridOnHeap = new PureHybridMemorySegment(largeSegment);
-		this.pureHybridOffHeap = new PureHybridMemorySegment(largeOffHeap);
-		this.ser = LongSerializer.INSTANCE;
-	}
-	
-	@Benchmark
-	public void coreHeapMemorySegment() throws Exception {
-		
-		ArrayList<MemorySegment> memory = new ArrayList<>();
-		memory.add(coreHeap);
-		ArrayList<MemorySegment> target = new ArrayList<>();
-		
-		CoreMemorySegmentOutView output = new CoreMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			for (long i = 0; i < innerRounds; i++) {
-				ser.serialize(i, output);
-			}
-			
-			target.clear();
-			memory.add(coreHeap);
-			output.reset();
-		}
-	}
-	
-	@Benchmark
-	public void coreHybridOnHeapMemorySegment() throws Exception {
-		
-		ArrayList<MemorySegment> memory = new ArrayList<>();
-		memory.add(coreHybridOnHeap);
-		ArrayList<MemorySegment> target = new ArrayList<>();
-		
-		CoreMemorySegmentOutView output = new CoreMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			for (long i = 0; i < innerRounds; i++) {
-				ser.serialize(i, output);
-			}
-			
-			target.clear();
-			memory.add(coreHybridOnHeap);
-			output.reset();
-		}
-	}
-	
-	@Benchmark
-	public void coreHybridOffHeapMemorySegment() throws Exception {
-		
-		ArrayList<MemorySegment> memory = new ArrayList<>();
-		memory.add(coreHybridOffHeap);
-		ArrayList<MemorySegment> target = new ArrayList<>();
-		
-		CoreMemorySegmentOutView output = new CoreMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			for (long i = 0; i < innerRounds; i++) {
-				ser.serialize(i, output);
-			}
-			
-			target.clear();
-			memory.add(coreHybridOffHeap);
-			output.reset();
-		}
-	}
-	
-	@Benchmark
-	public void pureHeapMemorySegment() throws Exception {
-		
-		ArrayList<PureHeapMemorySegment> memory = new ArrayList<>();
-		memory.add(pureHeap);
-		ArrayList<PureHeapMemorySegment> target = new ArrayList<>();
-		
-		PureHeapMemorySegmentOutView output = new PureHeapMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			for (long i = 0; i < innerRounds; i++) {
-				ser.serialize(i, output);
-			}
-			
-			target.clear();
-			memory.add(pureHeap);
-			output.reset();
-		}
-	}
-	
-	@Benchmark
-	public void pureHybridOnHeapMemorySegment() throws Exception {
-		
-		ArrayList<PureHybridMemorySegment> memory = new ArrayList<>();
-		memory.add(pureHybridOnHeap);
-		ArrayList<PureHybridMemorySegment> target = new ArrayList<>();
-		
-		PureHybridMemorySegmentOutView output = new PureHybridMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			for (long i = 0; i < innerRounds; i++) {
-				ser.serialize(i, output);
-			}
-			
-			target.clear();
-			memory.add(pureHybridOnHeap);
-			output.reset();
-		}
-	}
-	
-	@Benchmark
-	public void pureHybridOffHeapMemorySegment() throws Exception {
-		
-		ArrayList<PureHybridMemorySegment> memory = new ArrayList<>();
-		memory.add(pureHybridOffHeap);
-		ArrayList<PureHybridMemorySegment> target = new ArrayList<>();
-		
-		PureHybridMemorySegmentOutView output = new PureHybridMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			for (long i = 0; i < innerRounds; i++) {
-				ser.serialize(i, output);
-			}
-			
-			target.clear();
-			memory.add(pureHybridOffHeap);
-			output.reset();
-		}
-	}
-	
-	private static void fillOnHeap(byte[] buffer, byte data) {
-		for (int i = 0; i < buffer.length; i++) {
-			buffer[i] = data;
-		}
-	}
-	
-	private static void fillOffHeap(ByteBuffer buffer, byte data) {
-		final int len = buffer.capacity();
-		for (int i = 0; i < len; i++) {
-			buffer.put(i, data);
-		}
-	}
-	
-	public static void main(String[] args) throws Exception {
-		Options opt = new OptionsBuilder()
-				.include(LongSerializationSpeedBenchmark.class.getSimpleName())
-				.warmupIterations(2)
-				.measurementIterations(2)
-				.forks(1)
-				.build();
-		new Runner(opt).run();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2063fa12/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/StringSerializationSpeedBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/StringSerializationSpeedBenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/StringSerializationSpeedBenchmark.java
deleted file mode 100644
index d1268a7..0000000
--- a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/StringSerializationSpeedBenchmark.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.benchmark.core.memory;
-
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.HybridMemorySegment;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.benchmark.core.memory.segments.*;
-import org.openjdk.jmh.annotations.*;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-@State(Scope.Thread)
-@BenchmarkMode(Mode.AverageTime)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
-public class StringSerializationSpeedBenchmark {
-	
-	private final int LARGE_SEGMENT_SIZE = 1024 * 1024 * 1024;
-	
-	private final byte[] largeSegment = new byte[LARGE_SEGMENT_SIZE];
-	
-	private final int outerRounds = 10;
-	
-	private final int innerRounds = 5000;
-	
-	private ByteBuffer largeOffHeap;
-	
-	private String[] randomStrings;
-	
-	private StringSerializer ser;
-	
-	@Setup
-	public void init() throws Exception {
-		
-		this.largeOffHeap = ByteBuffer.allocateDirect(LARGE_SEGMENT_SIZE);
-		this.randomStrings = generateRandomStrings(5468917685263896L, 1000, 128, 6, true);
-		this.ser = StringSerializer.INSTANCE;
-		
-	}
-	
-	@Benchmark
-	public void coreHeapMemorySegment() throws Exception {
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			
-			ArrayList<MemorySegment> memory = new ArrayList<>();
-			memory.add(HeapMemorySegment.FACTORY.wrapPooledHeapMemory(largeSegment, null));
-			ArrayList<MemorySegment> target = new ArrayList<>();
-			
-			CoreMemorySegmentOutView output = new CoreMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-			
-			for (int i = 0; i < innerRounds; i++) {
-				for (String s : randomStrings) {
-					ser.serialize(s, output);
-				}
-			}
-		}
-	}
-	
-	@Benchmark
-	public void coreHybridMemorySegmentOnHeap() throws Exception {
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			
-			ArrayList<MemorySegment> memory = new ArrayList<>();
-			memory.add(HybridMemorySegment.FACTORY.wrapPooledHeapMemory(largeSegment, null));
-			ArrayList<MemorySegment> target = new ArrayList<>();
-			
-			CoreMemorySegmentOutView output = new CoreMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-			
-			for (int i = 0; i < innerRounds; i++) {
-				for (String s : randomStrings) {
-					ser.serialize(s, output);
-				}
-			}
-		}
-	}
-	
-	@Benchmark
-	public void coreHybridMemorySegmentOffHeap() throws Exception {
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			
-			ArrayList<MemorySegment> memory = new ArrayList<>();
-			memory.add(HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(largeOffHeap, null));
-			ArrayList<MemorySegment> target = new ArrayList<>();
-			
-			CoreMemorySegmentOutView output = new CoreMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-			
-			for (int i = 0; i < innerRounds; i++) {
-				for (String s : randomStrings) {
-					ser.serialize(s, output);
-				}
-			}
-		}
-	}
-	
-	@Benchmark
-	public void pureHybridMemorySegmentOnheap() throws Exception {
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			
-			ArrayList<PureHybridMemorySegment> memory = new ArrayList<>();
-			memory.add(new PureHybridMemorySegment(largeSegment));
-			ArrayList<PureHybridMemorySegment> target = new ArrayList<>();
-			
-			PureHybridMemorySegmentOutView output = new PureHybridMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-			
-			for (int i = 0; i < innerRounds; i++) {
-				for (String s : randomStrings) {
-					ser.serialize(s, output);
-				}
-			}
-		}
-	}
-	
-	@Benchmark
-	public void pureHybridMemorySegmentOffHeap() throws Exception {
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			
-			ArrayList<PureHybridMemorySegment> memory = new ArrayList<>();
-			memory.add(new PureHybridMemorySegment(largeOffHeap));
-			ArrayList<PureHybridMemorySegment> target = new ArrayList<>();
-			
-			PureHybridMemorySegmentOutView output = new PureHybridMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-			
-			for (int i = 0; i < innerRounds; i++) {
-				for (String s : randomStrings) {
-					ser.serialize(s, output);
-				}
-			}
-		}
-	}
-	
-	@Benchmark
-	public void pureHeapMemorySegment() throws Exception {
-		
-		for (int outer = 0; outer < outerRounds; outer++) {
-			
-			ArrayList<PureHeapMemorySegment> memory = new ArrayList<>();
-			memory.add(new PureHeapMemorySegment(largeSegment));
-			ArrayList<PureHeapMemorySegment> target = new ArrayList<>();
-			
-			PureHeapMemorySegmentOutView output = new PureHeapMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-			
-			for (int i = 0; i < innerRounds; i++) {
-				for (String s : randomStrings) {
-					ser.serialize(s, output);
-				}
-			}
-		}
-	}
-	
-	private static String[] generateRandomStrings(long seed, int num, int maxLen, int minLen, boolean asciiOnly) {
-		Random rnd = new Random(seed);
-		String[] array = new String[num];
-		StringBuilder bld = new StringBuilder(maxLen);
-		
-		int minCharValue = 40;
-		int charRange = asciiOnly ? 60 : 30000;
-		
-		for (int i = 0; i < num; i++) {
-			bld.setLength(0);
-			int len = rnd.nextInt(maxLen - minLen) + minLen;
-			
-			for (int k = 0; k < len; k++) {
-				bld.append((char) (rnd.nextInt(charRange) + minCharValue));
-			}
-			
-			array[i] = bld.toString();
-		}
-		
-		return array;
-	}
-	
-	public static void main(String[] args) throws Exception {
-		Options opt = new OptionsBuilder()
-				.include(StringSerializationSpeedBenchmark.class.getSimpleName())
-				.warmupIterations(2)
-				.measurementIterations(2)
-				.forks(1)
-				.build();
-		new Runner(opt).run();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2063fa12/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/CoreMemorySegmentOutView.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/CoreMemorySegmentOutView.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/CoreMemorySegmentOutView.java
deleted file mode 100644
index 58c42ac..0000000
--- a/flink-benchmark/src/test/java/org/apache/flink/benchmark/core/memory/segments/CoreMemorySegmentOutView.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.benchmark.core.memory.segments;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.util.List;
-
-public final class CoreMemorySegmentOutView implements DataOutputView {
-
-	private MemorySegment currentSegment;	// the current memory segment to write to
-
-	private int positionInSegment;					// the offset in the current segment
-	
-	private final int segmentSize;				// the size of the memory segments
-
-	private final  List<MemorySegment> memorySource;
-	
-	private final List<MemorySegment> fullSegments;
-	
-
-	private byte[] utfBuffer;		// the reusable array for UTF encodings
-
-
-	public CoreMemorySegmentOutView(List<MemorySegment> emptySegments,
-									List<MemorySegment> fullSegmentTarget, int segmentSize) {
-		this.segmentSize = segmentSize;
-		this.currentSegment = emptySegments.remove(emptySegments.size() - 1);
-
-		this.memorySource = emptySegments;
-		this.fullSegments = fullSegmentTarget;
-		this.fullSegments.add(getCurrentSegment());
-	}
-
-
-	public void reset() {
-		if (this.fullSegments.size() != 0) {
-			throw new IllegalStateException("The target list still contains memory segments.");
-		}
-
-		clear();
-		try {
-			advance();
-		}
-		catch (IOException ioex) {
-			throw new RuntimeException("Error getting first segment for record collector.", ioex);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                  Page Management
-	// --------------------------------------------------------------------------------------------
-
-	public MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws EOFException {
-		int size = this.memorySource.size();
-		if (size > 0) {
-			final MemorySegment next = this.memorySource.remove(size - 1);
-			this.fullSegments.add(next);
-			return next;
-		} else {
-			throw new EOFException();
-		}
-	}
-	
-	public MemorySegment getCurrentSegment() {
-		return this.currentSegment;
-	}
-
-	public int getCurrentPositionInSegment() {
-		return this.positionInSegment;
-	}
-	
-	public int getSegmentSize() {
-		return this.segmentSize;
-	}
-	
-	protected void advance() throws IOException {
-		this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment);
-		this.positionInSegment = 0;
-	}
-	
-	protected void seekOutput(MemorySegment seg, int position) {
-		this.currentSegment = seg;
-		this.positionInSegment = position;
-	}
-
-	protected void clear() {
-		this.currentSegment = null;
-		this.positionInSegment = 0;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//                               Data Output Specific methods
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void write(int b) throws IOException {
-		writeByte(b);
-	}
-
-	@Override
-	public void write(byte[] b) throws IOException {
-		write(b, 0, b.length);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		int remaining = this.segmentSize - this.positionInSegment;
-		if (remaining >= len) {
-			this.currentSegment.put(this.positionInSegment, b, off, len);
-			this.positionInSegment += len;
-		}
-		else {
-			if (remaining == 0) {
-				advance();
-				remaining = this.segmentSize - this.positionInSegment;
-			}
-			while (true) {
-				int toPut = Math.min(remaining, len);
-				this.currentSegment.put(this.positionInSegment, b, off, toPut);
-				off += toPut;
-				len -= toPut;
-
-				if (len > 0) {
-					this.positionInSegment = this.segmentSize;
-					advance();
-					remaining = this.segmentSize - this.positionInSegment;
-				}
-				else {
-					this.positionInSegment += toPut;
-					break;
-				}
-			}
-		}
-	}
-
-	@Override
-	public void writeBoolean(boolean v) throws IOException {
-		writeByte(v ? 1 : 0);
-	}
-
-	@Override
-	public void writeByte(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize) {
-			this.currentSegment.put(this.positionInSegment++, (byte) v);
-		}
-		else {
-			advance();
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeShort(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 1) {
-			this.currentSegment.putShortBigEndian(this.positionInSegment, (short) v);
-			this.positionInSegment += 2;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeShort(v);
-		}
-		else {
-			writeByte(v >> 8);
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeChar(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 1) {
-			this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v);
-			this.positionInSegment += 2;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeChar(v);
-		}
-		else {
-			writeByte(v >> 8);
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeInt(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 3) {
-			this.currentSegment.putIntBigEndian(this.positionInSegment, v);
-			this.positionInSegment += 4;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeInt(v);
-		}
-		else {
-			writeByte(v >> 24);
-			writeByte(v >> 16);
-			writeByte(v >>  8);
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeLong(long v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 7) {
-			this.currentSegment.putLongBigEndian(this.positionInSegment, v);
-			this.positionInSegment += 8;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeLong(v);
-		}
-		else {
-			writeByte((int) (v >> 56));
-			writeByte((int) (v >> 48));
-			writeByte((int) (v >> 40));
-			writeByte((int) (v >> 32));
-			writeByte((int) (v >> 24));
-			writeByte((int) (v >> 16));
-			writeByte((int) (v >>  8));
-			writeByte((int) v);
-		}
-	}
-
-	@Override
-	public void writeFloat(float v) throws IOException {
-		writeInt(Float.floatToRawIntBits(v));
-	}
-
-	@Override
-	public void writeDouble(double v) throws IOException {
-		writeLong(Double.doubleToRawLongBits(v));
-	}
-
-	@Override
-	public void writeBytes(String s) throws IOException {
-		for (int i = 0; i < s.length(); i++) {
-			writeByte(s.charAt(i));
-		}
-	}
-
-	@Override
-	public void writeChars(String s) throws IOException {
-		for (int i = 0; i < s.length(); i++) {
-			writeChar(s.charAt(i));
-		}
-	}
-
-	@Override
-	public void writeUTF(String str) throws IOException {
-		int strlen = str.length();
-		int utflen = 0;
-		int c, count = 0;
-
-		/* use charAt instead of copying String to char array */
-		for (int i = 0; i < strlen; i++) {
-			c = str.charAt(i);
-			if ((c >= 0x0001) && (c <= 0x007F)) {
-				utflen++;
-			} else if (c > 0x07FF) {
-				utflen += 3;
-			} else {
-				utflen += 2;
-			}
-		}
-
-		if (utflen > 65535) {
-			throw new UTFDataFormatException("encoded string too long: " + utflen + " memory");
-		}
-
-		if (this.utfBuffer == null || this.utfBuffer.length < utflen + 2) {
-			this.utfBuffer = new byte[utflen + 2];
-		}
-		final byte[] bytearr = this.utfBuffer;
-
-		bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-		bytearr[count++] = (byte) (utflen & 0xFF);
-
-		int i = 0;
-		for (i = 0; i < strlen; i++) {
-			c = str.charAt(i);
-			if (!((c >= 0x0001) && (c <= 0x007F))) {
-				break;
-			}
-			bytearr[count++] = (byte) c;
-		}
-
-		for (; i < strlen; i++) {
-			c = str.charAt(i);
-			if ((c >= 0x0001) && (c <= 0x007F)) {
-				bytearr[count++] = (byte) c;
-
-			} else if (c > 0x07FF) {
-				bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
-				bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
-				bytearr[count++] = (byte) (0x80 | (c & 0x3F));
-			} else {
-				bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
-				bytearr[count++] = (byte) (0x80 | (c & 0x3F));
-			}
-		}
-
-		write(bytearr, 0, utflen + 2);
-	}
-
-	@Override
-	public void skipBytesToWrite(int numBytes) throws IOException {
-		while (numBytes > 0) {
-			final int remaining = this.segmentSize - this.positionInSegment;
-			if (numBytes <= remaining) {
-				this.positionInSegment += numBytes;
-				return;
-			}
-			this.positionInSegment = this.segmentSize;
-			advance();
-			numBytes -= remaining;
-		}
-	}
-
-	@Override
-	public void write(DataInputView source, int numBytes) throws IOException {
-		while (numBytes > 0) {
-			final int remaining = this.segmentSize - this.positionInSegment;
-			if (numBytes <= remaining) {
-				this.currentSegment.put(source, this.positionInSegment, numBytes);
-				this.positionInSegment += numBytes;
-				return;
-			}
-
-			if (remaining > 0) {
-				this.currentSegment.put(source, this.positionInSegment, remaining);
-				this.positionInSegment = this.segmentSize;
-				numBytes -= remaining;
-			}
-
-			advance();
-		}
-	}
-}