You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/26 22:50:45 UTC

[2/8] flink git commit: [FLINK-2890] Port StringSerializationSpeedBenchmark to JMH. [FLINK-2889] Port LongSerializationSpeedBenchmark to JMH.

http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java
deleted file mode 100644
index e247eed..0000000
--- a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory.benchmarks;
-
-import org.apache.flink.core.memory.MemoryUtils;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public final class PureHeapMemorySegment {
-
-	/** Constant that flags the byte order. Because this is a boolean constant,
-	 * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */
-	private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
-
-	/** The array in which the data is stored. */
-	private byte[] memory;
-
-	/** Wrapper for I/O requests. */
-	private ByteBuffer wrapper;
-
-	/** The size, stored extra, because we may clear the reference to the byte array */
-	private final int size;
-
-	// -------------------------------------------------------------------------
-	//                             Constructors
-	// -------------------------------------------------------------------------
-
-	/**
-	 * Creates a new memory segment that represents the data in the given byte array.
-	 *
-	 * @param memory The byte array that holds the data.
-	 */
-	public PureHeapMemorySegment(byte[] memory) {
-		this.memory = memory;
-		this.size = memory.length;
-	}
-
-	// -------------------------------------------------------------------------
-	//                      Direct Memory Segment Specifics
-	// -------------------------------------------------------------------------
-
-	/**
-	 * Gets the byte array that backs this memory segment.
-	 *
-	 * @return The byte array that backs this memory segment.
-	 */
-	public byte[] getArray() {
-		return this.memory;
-	}
-
-	// -------------------------------------------------------------------------
-	//                        MemorySegment Accessors
-	// -------------------------------------------------------------------------
-	
-	public final boolean isFreed() {
-		return this.memory == null;
-	}
-	
-	public final void free() {
-		this.wrapper = null;
-		this.memory = null;
-	}
-	
-	public final int size() {
-		return this.size;
-	}
-	
-	public final ByteBuffer wrap(int offset, int length) {
-		if (offset > this.memory.length || offset > this.memory.length - length) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		if (this.wrapper == null) {
-			this.wrapper = ByteBuffer.wrap(this.memory, offset, length);
-		}
-		else {
-			this.wrapper.limit(offset + length);
-			this.wrapper.position(offset);
-		}
-
-		return this.wrapper;
-	}
-	
-	// ------------------------------------------------------------------------
-	//                    Random Access get() and put() methods
-	// ------------------------------------------------------------------------
-	
-	public final byte get(int index) {
-		return this.memory[index];
-	}
-	
-	public final void put(int index, byte b) {
-		this.memory[index] = b;
-	}
-	
-	public final void get(int index, byte[] dst) {
-		get(index, dst, 0, dst.length);
-	}
-	
-	public final void put(int index, byte[] src) {
-		put(index, src, 0, src.length);
-	}
-	
-	public final void get(int index, byte[] dst, int offset, int length) {
-		// system arraycopy does the boundary checks anyways, no need to check extra
-		System.arraycopy(this.memory, index, dst, offset, length);
-	}
-	
-	public final void put(int index, byte[] src, int offset, int length) {
-		// system arraycopy does the boundary checks anyways, no need to check extra
-		System.arraycopy(src, offset, this.memory, index, length);
-	}
-	
-	public final boolean getBoolean(int index) {
-		return this.memory[index] != 0;
-	}
-	
-	public final void putBoolean(int index, boolean value) {
-		this.memory[index] = (byte) (value ? 1 : 0);
-	}
-	
-	@SuppressWarnings("restriction")
-	public final char getChar(int index) {
-		if (index >= 0 && index <= this.memory.length - 2) {
-			return UNSAFE.getChar(this.memory, BASE_OFFSET + index);
-		} else {
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final char getCharLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getChar(index);
-		} else {
-			return Character.reverseBytes(getChar(index));
-		}
-	}
-
-	public final char getCharBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Character.reverseBytes(getChar(index));
-		} else {
-			return getChar(index);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void putChar(int index, char value) {
-		if (index >= 0 && index <= this.memory.length - 2) {
-			UNSAFE.putChar(this.memory, BASE_OFFSET + index, value);
-		} else {
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putCharLittleEndian(int index, char value) {
-		if (LITTLE_ENDIAN) {
-			putChar(index, value);
-		} else {
-			putChar(index, Character.reverseBytes(value));
-		}
-	}
-
-	public final void putCharBigEndian(int index, char value) {
-		if (LITTLE_ENDIAN) {
-			putChar(index, Character.reverseBytes(value));
-		} else {
-			putChar(index, value);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final short getShort(int index) {
-		if (index >= 0 && index <= this.memory.length - 2) {
-			return UNSAFE.getShort(this.memory, BASE_OFFSET + index);
-		} else {
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final short getShortLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getShort(index);
-		} else {
-			return Short.reverseBytes(getShort(index));
-		}
-	}
-	
-	public final short getShortBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Short.reverseBytes(getShort(index));
-		} else {
-			return getShort(index);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void putShort(int index, short value) {
-		if (index >= 0 && index <= this.memory.length - 2) {
-			UNSAFE.putShort(this.memory, BASE_OFFSET + index, value);
-		} else {
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putShortLittleEndian(int index, short value) {
-		if (LITTLE_ENDIAN) {
-			putShort(index, value);
-		} else {
-			putShort(index, Short.reverseBytes(value));
-		}
-	}
-	
-	public final void putShortBigEndian(int index, short value) {
-		if (LITTLE_ENDIAN) {
-			putShort(index, Short.reverseBytes(value));
-		} else {
-			putShort(index, value);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final int getInt(int index) {
-		if (index >= 0 && index <= this.memory.length - 4) {
-			return UNSAFE.getInt(this.memory, BASE_OFFSET + index);
-		} else {
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final int getIntLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getInt(index);
-		} else {
-			return Integer.reverseBytes(getInt(index));
-		}
-	}
-	
-	public final int getIntBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Integer.reverseBytes(getInt(index));
-		} else {
-			return getInt(index);
-		}
-	}
-	
-	@SuppressWarnings("restriction")
-	public final void putInt(int index, int value) {
-		if (index >= 0 && index <= this.memory.length - 4) {
-			UNSAFE.putInt(this.memory, BASE_OFFSET + index, value);
-		} else {
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putIntLittleEndian(int index, int value) {
-		if (LITTLE_ENDIAN) {
-			putInt(index, value);
-		} else {
-			putInt(index, Integer.reverseBytes(value));
-		}
-	}
-	
-	public final void putIntBigEndian(int index, int value) {
-		if (LITTLE_ENDIAN) {
-			putInt(index, Integer.reverseBytes(value));
-		} else {
-			putInt(index, value);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final long getLong(int index) {
-		if (index >= 0 && index <= this.memory.length - 8) {
-			return UNSAFE.getLong(this.memory, BASE_OFFSET + index);
-		} else {
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final long getLongLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getLong(index);
-		} else {
-			return Long.reverseBytes(getLong(index));
-		}
-	}
-	
-	public final long getLongBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Long.reverseBytes(getLong(index));
-		} else {
-			return getLong(index);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void putLong(int index, long value) {
-		if (index >= 0 && index <= this.memory.length - 8) {
-			UNSAFE.putLong(this.memory, BASE_OFFSET + index, value);
-		} else {
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putLongLittleEndian(int index, long value) {
-		if (LITTLE_ENDIAN) {
-			putLong(index, value);
-		} else {
-			putLong(index, Long.reverseBytes(value));
-		}
-	}
-	
-	public final void putLongBigEndian(int index, long value) {
-		if (LITTLE_ENDIAN) {
-			putLong(index, Long.reverseBytes(value));
-		} else {
-			putLong(index, value);
-		}
-	}
-
-	public final float getFloat(int index) {
-		return Float.intBitsToFloat(getInt(index));
-	}
-	
-	public final float getFloatLittleEndian(int index) {
-		return Float.intBitsToFloat(getIntLittleEndian(index));
-	}
-	
-	public final float getFloatBigEndian(int index) {
-		return Float.intBitsToFloat(getIntBigEndian(index));
-	}
-	
-	public final void putFloat(int index, float value) {
-		putInt(index, Float.floatToRawIntBits(value));
-	}
-	
-	public final void putFloatLittleEndian(int index, float value) {
-		putIntLittleEndian(index, Float.floatToRawIntBits(value));
-	}
-	
-	public final void putFloatBigEndian(int index, float value) {
-		putIntBigEndian(index, Float.floatToRawIntBits(value));
-	}
-	
-	public final double getDouble(int index) {
-		return Double.longBitsToDouble(getLong(index));
-	}
-	
-	public final double getDoubleLittleEndian(int index) {
-		return Double.longBitsToDouble(getLongLittleEndian(index));
-	}
-
-	public final double getDoubleBigEndian(int index) {
-		return Double.longBitsToDouble(getLongBigEndian(index));
-	}
-	
-	public final void putDouble(int index, double value) {
-		putLong(index, Double.doubleToRawLongBits(value));
-	}
-	
-	public final void putDoubleLittleEndian(int index, double value) {
-		putLongLittleEndian(index, Double.doubleToRawLongBits(value));
-	}
-	
-	public final void putDoubleBigEndian(int index, double value) {
-		putLongBigEndian(index, Double.doubleToRawLongBits(value));
-	}
-
-	// -------------------------------------------------------------------------
-	//                     Bulk Read and Write Methods
-	// -------------------------------------------------------------------------
-	
-	public final void get(DataOutput out, int offset, int length) throws IOException {
-		out.write(this.memory, offset, length);
-	}
-	
-	public final void put(DataInput in, int offset, int length) throws IOException {
-		in.readFully(this.memory, offset, length);
-	}
-	
-	public final void get(int offset, ByteBuffer target, int numBytes) {
-		// ByteBuffer performs the boundary checks
-		target.put(this.memory, offset, numBytes);
-	}
-	
-	public final void put(int offset, ByteBuffer source, int numBytes) {
-		// ByteBuffer performs the boundary checks
-		source.get(this.memory, offset, numBytes);
-	}
-	
-	public final void copyTo(int offset, PureHeapMemorySegment target, int targetOffset, int numBytes) {
-		// system arraycopy does the boundary checks anyways, no need to check extra
-		System.arraycopy(this.memory, offset, target.memory, targetOffset, numBytes);
-	}
-
-	// -------------------------------------------------------------------------
-	//                      Comparisons & Swapping
-	// -------------------------------------------------------------------------
-	
-	public final int compare(PureHeapMemorySegment seg2, int offset1, int offset2, int len) {
-		final byte[] b2 = seg2.memory;
-		final byte[] b1 = this.memory;
-
-		int val = 0;
-		for (int pos = 0; pos < len && (val = (b1[offset1 + pos] & 0xff) - (b2[offset2 + pos] & 0xff)) == 0; pos++);
-		return val;
-	}
-
-	public final void swapBytes(PureHeapMemorySegment seg2, int offset1, int offset2, int len) {
-		// swap by bytes (chunks of 8 first, then single bytes)
-		while (len >= 8) {
-			long tmp = this.getLong(offset1);
-			this.putLong(offset1, seg2.getLong(offset2));
-			seg2.putLong(offset2, tmp);
-			offset1 += 8;
-			offset2 += 8;
-			len -= 8;
-		}
-		while (len > 0) {
-			byte tmp = this.get(offset1);
-			this.put(offset1, seg2.get(offset2));
-			seg2.put(offset2, tmp);
-			offset1++;
-			offset2++;
-			len--;
-		}
-	}
-	
-	public final void swapBytes(byte[] auxBuffer, PureHeapMemorySegment seg2, int offset1, int offset2, int len) {
-		byte[] otherMem = seg2.memory;
-		System.arraycopy(this.memory, offset1, auxBuffer, 0, len);
-		System.arraycopy(otherMem, offset2, this.memory, offset1, len);
-		System.arraycopy(auxBuffer, 0, otherMem, offset2, len);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//                     Utilities for native memory accesses and checks
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("restriction")
-	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-
-	@SuppressWarnings("restriction")
-	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java
deleted file mode 100644
index 1e3b89e..0000000
--- a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory.benchmarks;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.util.List;
-
-public final class PureHeapMemorySegmentOutView implements DataOutputView {
-
-	private PureHeapMemorySegment currentSegment;	// the current memory segment to write to
-
-	private int positionInSegment;					// the offset in the current segment
-	
-	private final int segmentSize;				// the size of the memory segments
-
-	private final  List<PureHeapMemorySegment> memorySource;
-	
-	private final List<PureHeapMemorySegment> fullSegments;
-	
-
-	private byte[] utfBuffer;		// the reusable array for UTF encodings
-
-
-	public PureHeapMemorySegmentOutView(List<PureHeapMemorySegment> emptySegments,
-										List<PureHeapMemorySegment> fullSegmentTarget, int segmentSize) {
-		this.segmentSize = segmentSize;
-		this.currentSegment = emptySegments.remove(emptySegments.size() - 1);
-
-		this.memorySource = emptySegments;
-		this.fullSegments = fullSegmentTarget;
-		this.fullSegments.add(getCurrentSegment());
-	}
-
-
-	public void reset() {
-		if (this.fullSegments.size() != 0) {
-			throw new IllegalStateException("The target list still contains memory segments.");
-		}
-
-		clear();
-		try {
-			advance();
-		}
-		catch (IOException ioex) {
-			throw new RuntimeException("Error getting first segment for record collector.", ioex);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                  Page Management
-	// --------------------------------------------------------------------------------------------
-
-	public PureHeapMemorySegment nextSegment(PureHeapMemorySegment current, int positionInCurrent) throws EOFException {
-		int size = this.memorySource.size();
-		if (size > 0) {
-			final PureHeapMemorySegment next = this.memorySource.remove(size - 1);
-			this.fullSegments.add(next);
-			return next;
-		} else {
-			throw new EOFException();
-		}
-	}
-	
-	public PureHeapMemorySegment getCurrentSegment() {
-		return this.currentSegment;
-	}
-
-	public int getCurrentPositionInSegment() {
-		return this.positionInSegment;
-	}
-	
-	public int getSegmentSize() {
-		return this.segmentSize;
-	}
-	
-	protected void advance() throws IOException {
-		this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment);
-		this.positionInSegment = 0;
-	}
-	
-	protected void seekOutput(PureHeapMemorySegment seg, int position) {
-		this.currentSegment = seg;
-		this.positionInSegment = position;
-	}
-
-	protected void clear() {
-		this.currentSegment = null;
-		this.positionInSegment = 0;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//                               Data Output Specific methods
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void write(int b) throws IOException {
-		writeByte(b);
-	}
-
-	@Override
-	public void write(byte[] b) throws IOException {
-		write(b, 0, b.length);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		int remaining = this.segmentSize - this.positionInSegment;
-		if (remaining >= len) {
-			this.currentSegment.put(this.positionInSegment, b, off, len);
-			this.positionInSegment += len;
-		}
-		else {
-			if (remaining == 0) {
-				advance();
-				remaining = this.segmentSize - this.positionInSegment;
-			}
-			while (true) {
-				int toPut = Math.min(remaining, len);
-				this.currentSegment.put(this.positionInSegment, b, off, toPut);
-				off += toPut;
-				len -= toPut;
-
-				if (len > 0) {
-					this.positionInSegment = this.segmentSize;
-					advance();
-					remaining = this.segmentSize - this.positionInSegment;
-				}
-				else {
-					this.positionInSegment += toPut;
-					break;
-				}
-			}
-		}
-	}
-
-	@Override
-	public void writeBoolean(boolean v) throws IOException {
-		writeByte(v ? 1 : 0);
-	}
-
-	@Override
-	public void writeByte(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize) {
-			this.currentSegment.put(this.positionInSegment++, (byte) v);
-		}
-		else {
-			advance();
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeShort(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 1) {
-			this.currentSegment.putShortBigEndian(this.positionInSegment, (short) v);
-			this.positionInSegment += 2;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeShort(v);
-		}
-		else {
-			writeByte(v >> 8);
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeChar(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 1) {
-			this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v);
-			this.positionInSegment += 2;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeChar(v);
-		}
-		else {
-			writeByte(v >> 8);
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeInt(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 3) {
-			this.currentSegment.putIntBigEndian(this.positionInSegment, v);
-			this.positionInSegment += 4;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeInt(v);
-		}
-		else {
-			writeByte(v >> 24);
-			writeByte(v >> 16);
-			writeByte(v >>  8);
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeLong(long v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 7) {
-			this.currentSegment.putLongBigEndian(this.positionInSegment, v);
-			this.positionInSegment += 8;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeLong(v);
-		}
-		else {
-			writeByte((int) (v >> 56));
-			writeByte((int) (v >> 48));
-			writeByte((int) (v >> 40));
-			writeByte((int) (v >> 32));
-			writeByte((int) (v >> 24));
-			writeByte((int) (v >> 16));
-			writeByte((int) (v >>  8));
-			writeByte((int) v);
-		}
-	}
-
-	@Override
-	public void writeFloat(float v) throws IOException {
-		writeInt(Float.floatToRawIntBits(v));
-	}
-
-	@Override
-	public void writeDouble(double v) throws IOException {
-		writeLong(Double.doubleToRawLongBits(v));
-	}
-
-	@Override
-	public void writeBytes(String s) throws IOException {
-		for (int i = 0; i < s.length(); i++) {
-			writeByte(s.charAt(i));
-		}
-	}
-
-	@Override
-	public void writeChars(String s) throws IOException {
-		for (int i = 0; i < s.length(); i++) {
-			writeChar(s.charAt(i));
-		}
-	}
-
-	@Override
-	public void writeUTF(String str) throws IOException {
-		int strlen = str.length();
-		int utflen = 0;
-		int c, count = 0;
-
-		/* use charAt instead of copying String to char array */
-		for (int i = 0; i < strlen; i++) {
-			c = str.charAt(i);
-			if ((c >= 0x0001) && (c <= 0x007F)) {
-				utflen++;
-			} else if (c > 0x07FF) {
-				utflen += 3;
-			} else {
-				utflen += 2;
-			}
-		}
-
-		if (utflen > 65535) {
-			throw new UTFDataFormatException("encoded string too long: " + utflen + " memory");
-		}
-
-		if (this.utfBuffer == null || this.utfBuffer.length < utflen + 2) {
-			this.utfBuffer = new byte[utflen + 2];
-		}
-		final byte[] bytearr = this.utfBuffer;
-
-		bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-		bytearr[count++] = (byte) (utflen & 0xFF);
-
-		int i = 0;
-		for (i = 0; i < strlen; i++) {
-			c = str.charAt(i);
-			if (!((c >= 0x0001) && (c <= 0x007F))) {
-				break;
-			}
-			bytearr[count++] = (byte) c;
-		}
-
-		for (; i < strlen; i++) {
-			c = str.charAt(i);
-			if ((c >= 0x0001) && (c <= 0x007F)) {
-				bytearr[count++] = (byte) c;
-
-			} else if (c > 0x07FF) {
-				bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
-				bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
-				bytearr[count++] = (byte) (0x80 | (c & 0x3F));
-			} else {
-				bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
-				bytearr[count++] = (byte) (0x80 | (c & 0x3F));
-			}
-		}
-
-		write(bytearr, 0, utflen + 2);
-	}
-
-	@Override
-	public void skipBytesToWrite(int numBytes) throws IOException {
-		while (numBytes > 0) {
-			final int remaining = this.segmentSize - this.positionInSegment;
-			if (numBytes <= remaining) {
-				this.positionInSegment += numBytes;
-				return;
-			}
-			this.positionInSegment = this.segmentSize;
-			advance();
-			numBytes -= remaining;
-		}
-	}
-
-	@Override
-	public void write(DataInputView source, int numBytes) throws IOException {
-		while (numBytes > 0) {
-			final int remaining = this.segmentSize - this.positionInSegment;
-			if (numBytes <= remaining) {
-				this.currentSegment.put(source, this.positionInSegment, numBytes);
-				this.positionInSegment += numBytes;
-				return;
-			}
-
-			if (remaining > 0) {
-				this.currentSegment.put(source, this.positionInSegment, remaining);
-				this.positionInSegment = this.segmentSize;
-				numBytes -= remaining;
-			}
-
-			advance();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegment.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegment.java
deleted file mode 100644
index 57817b9..0000000
--- a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegment.java
+++ /dev/null
@@ -1,887 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory.benchmarks;
-
-import org.apache.flink.core.memory.MemoryUtils;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public final class PureHybridMemorySegment {
-
-	/** Constant that flags the byte order. Because this is a boolean constant,
-	 * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */
-	private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
-	
-	/** The direct byte buffer that allocated the off-heap memory. This memory segment holds a reference
-	 * to that buffer, so as long as this memory segment lives, the memory will not be released. */
-	private final ByteBuffer offHeapMemory;
-	
-	/** The heap byte array object relative to which we access the memory. Is non-null if the
-	 *  memory is on the heap, is null, if the memory if off the heap. If we have this buffer, we
-	 *  must never void this reference, or the memory segment will point to undefined addresses 
-	 *  outside the heap and may in out-of-order execution cases cause segmentation faults. */
-	private final byte[] heapMemory;
-
-	/** The address to the data, relative to the heap memory byte array. If the heap memory byte array
-	 * is null, this becomes an absolute memory address outside the heap. */
-	private long address;
-
-	/** The address one byte after the last addressable byte.
-	 *  This is address + size while the segment is not disposed */
-	private final long addressLimit;
-
-	/** The size in bytes of the memory segment */
-	private final int size;
-
-	// -------------------------------------------------------------------------
-	//                             Constructors
-	// -------------------------------------------------------------------------
-
-	/**
-	 * Creates a new memory segment that represents the memory backing the given direct byte buffer.
-	 * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
-	 * otherwise this method with throw an IllegalArgumentException.
-	 *
-	 * @param buffer The byte buffer whose memory is represented by this memory segment.
-	 * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
-	 */
-	public PureHybridMemorySegment(ByteBuffer buffer) {
-		if (buffer == null || !buffer.isDirect()) {
-			throw new IllegalArgumentException("Can't initialize from non-direct ByteBuffer.");
-		}
-
-		this.offHeapMemory = buffer;
-		this.heapMemory = null;
-		this.size = buffer.capacity();
-		this.address = getAddress(buffer);
-		this.addressLimit = this.address + size;
-
-		if (address >= Long.MAX_VALUE - Integer.MAX_VALUE) {
-			throw new RuntimeException("Segment initialized with too large address: " + address
-					+ " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1));
-		}
-	}
-
-	/**
-	 * Creates a new memory segment that represents the memory of the byte array.
-	 *
-	 * @param buffer The byte array whose memory is represented by this memory segment.
-	 */
-	public PureHybridMemorySegment(byte[] buffer) {
-		if (buffer == null) {
-			throw new NullPointerException("buffer");
-		}
-		
-		this.offHeapMemory = null;
-		this.heapMemory = buffer;
-		this.address = BYTE_ARRAY_BASE_OFFSET;
-		this.addressLimit = BYTE_ARRAY_BASE_OFFSET + buffer.length;
-		this.size = buffer.length;
-	}
-	
-	// -------------------------------------------------------------------------
-	//                      Memory Segment Specifics
-	// -------------------------------------------------------------------------
-
-	/**
-	 * Gets the size of the memory segment, in bytes.
-	 * @return The size of the memory segment.
-	 */
-	public final int size() {
-		return size;
-	}
-
-	/**
-	 * Checks whether the memory segment was freed.
-	 * @return True, if the memory segment has been freed, false otherwise.
-	 */
-	public final boolean isFreed() {
-		return this.address > this.addressLimit;
-	}
-
-	/**
-	 * Frees this memory segment. After this operation has been called, no further operations are
-	 * possible on the memory segment and will fail. The actual memory (heap or off-heap) will only
-	 * be released after this memory segment object has become garbage collected. 
-	 */
-	public final void free() {
-		// this ensures we can place no more data and trigger
-		// the checks for the freed segment
-		address = addressLimit + 1;
-	}
-	
-	/**
-	 * Checks whether this memory segment is backed by off-heap memory.
-	 * @return True, if the memory segment is backed by off-heap memory, false if it is backed
-	 *         by heap memory.
-	 */
-	public final boolean isOffHeap() {
-		return heapMemory == null;
-	}
-
-	public byte[] getArray() {
-		if (heapMemory != null) {
-			return heapMemory;
-		} else {
-			throw new IllegalStateException("Memory segment does not represent heap memory");
-		}
-	}
-	
-	/**
-	 * Gets the buffer that owns the memory of this memory segment.
-	 *
-	 * @return The byte buffer that owns the memory of this memory segment.
-	 */
-	public ByteBuffer getOffHeapBuffer() {
-		if (offHeapMemory != null) {
-			return offHeapMemory;
-		} else {
-			throw new IllegalStateException("Memory segment does not represent off heap memory");
-		}
-	}
-	
-	public ByteBuffer wrap(int offset, int length) {
-		if (offset < 0 || offset > this.size || offset > this.size - length) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		if (heapMemory != null) {
-			return ByteBuffer.wrap(heapMemory, offset, length);
-		}
-		else {
-			ByteBuffer wrapper = offHeapMemory.duplicate();
-			wrapper.limit(offset + length);
-			wrapper.position(offset);
-			return wrapper;
-		}
-	}
-
-	/**
-	 * Gets this memory segment as a pure heap memory segment.
-	 * 
-	 * @return A heap memory segment variant of this memory segment.
-	 * @throws UnsupportedOperationException Thrown, if this memory segment is not
-	 *                                       backed by heap memory.
-	 */
-	public final PureHeapMemorySegment asHeapSegment() {
-		if (heapMemory != null) {
-			return new PureHeapMemorySegment(heapMemory);
-		} else {
-			throw new UnsupportedOperationException("Memory segment is not backed by heap memory");
-		}
-	}
-
-	/**
-	 * Gets this memory segment as a pure off-heap memory segment.
-	 *
-	 * @return An off-heap memory segment variant of this memory segment.
-	 * @throws UnsupportedOperationException Thrown, if this memory segment is not
-	 *                                       backed by off-heap memory.
-	 */
-	public final PureOffHeapMemorySegment asOffHeapSegment() {
-		if (offHeapMemory != null) {
-			return new PureOffHeapMemorySegment(offHeapMemory);
-		} else {
-			throw new UnsupportedOperationException("Memory segment is not backed by off-heap memory");
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//                    Random Access get() and put() methods
-	// ------------------------------------------------------------------------
-	
-	@SuppressWarnings("restriction")
-	public final byte get(int index) {
-		final long pos = address + index;
-		if (index >= 0 && pos < addressLimit) {
-			return UNSAFE.getByte(heapMemory, pos);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-	
-	@SuppressWarnings("restriction")
-	public final void put(int index, byte b) {
-		final long pos = address + index;
-		if (index >= 0 && pos < addressLimit) {
-			UNSAFE.putByte(heapMemory, pos, b);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-	
-	public final void get(int index, byte[] dst) {
-		get(index, dst, 0, dst.length);
-	}
-	
-	public final void put(int index, byte[] src) {
-		put(index, src, 0, src.length);
-	}
-	
-	@SuppressWarnings("restriction")
-	public final void get(int index, byte[] dst, int offset, int length) {
-		// check the byte array offset and length
-		if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		long pos = address + index;
-
-		if (index >= 0 && pos <= addressLimit - length) {
-			long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
-
-			// the copy must proceed in batches not too large, because the JVM may
-			// poll for points that are safe for GC (moving the array and changing its address)
-			while (length > 0) {
-				long toCopy = Math.min(length, COPY_PER_BATCH);
-				UNSAFE.copyMemory(heapMemory, pos, dst, arrayAddress, toCopy);
-				length -= toCopy;
-				pos += toCopy;
-				arrayAddress += toCopy;
-			}
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-	
-	@SuppressWarnings("restriction")
-	public final void put(int index, byte[] src, int offset, int length) {
-		// check the byte array offset and length
-		if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		long pos = address + index;
-
-		if (index >= 0 && pos <= addressLimit - length) {
-			long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
-			while (length > 0) {
-				long toCopy = Math.min(length, COPY_PER_BATCH);
-				UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, toCopy);
-				length -= toCopy;
-				pos += toCopy;
-				arrayAddress += toCopy;
-			}
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final boolean getBoolean(int index) {
-		return get(index) != 0;
-	}
-
-	public final void putBoolean(int index, boolean value) {
-		put(index, (byte) (value ? 1 : 0));
-	}
-
-	@SuppressWarnings("restriction")
-	public final char getChar(int index) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 2) {
-			return UNSAFE.getChar(heapMemory, pos);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final char getCharLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getChar(index);
-		} else {
-			return Character.reverseBytes(getChar(index));
-		}
-	}
-
-	public final char getCharBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Character.reverseBytes(getChar(index));
-		} else {
-			return getChar(index);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void putChar(int index, char value) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 2) {
-			UNSAFE.putChar(heapMemory, pos, value);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putCharLittleEndian(int index, char value) {
-		if (LITTLE_ENDIAN) {
-			putChar(index, value);
-		} else {
-			putChar(index, Character.reverseBytes(value));
-		}
-	}
-
-	public final void putCharBigEndian(int index, char value) {
-		if (LITTLE_ENDIAN) {
-			putChar(index, Character.reverseBytes(value));
-		} else {
-			putChar(index, value);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final short getShort(int index) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 2) {
-			return UNSAFE.getShort(heapMemory, pos);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final short getShortLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getShort(index);
-		} else {
-			return Short.reverseBytes(getShort(index));
-		}
-	}
-	
-	public final short getShortBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Short.reverseBytes(getShort(index));
-		} else {
-			return getShort(index);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void putShort(int index, short value) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 2) {
-			UNSAFE.putShort(heapMemory, pos, value);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putShortLittleEndian(int index, short value) {
-		if (LITTLE_ENDIAN) {
-			putShort(index, value);
-		} else {
-			putShort(index, Short.reverseBytes(value));
-		}
-	}
-	
-	public final void putShortBigEndian(int index, short value) {
-		if (LITTLE_ENDIAN) {
-			putShort(index, Short.reverseBytes(value));
-		} else {
-			putShort(index, value);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final int getInt(int index) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 4) {
-			return UNSAFE.getInt(heapMemory, pos);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final int getIntLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getInt(index);
-		} else {
-			return Integer.reverseBytes(getInt(index));
-		}
-	}
-	
-	public final int getIntBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Integer.reverseBytes(getInt(index));
-		} else {
-			return getInt(index);
-		}
-	}
-	
-	@SuppressWarnings("restriction")
-	public final void putInt(int index, int value) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 4) {
-			UNSAFE.putInt(heapMemory, pos, value);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putIntLittleEndian(int index, int value) {
-		if (LITTLE_ENDIAN) {
-			putInt(index, value);
-		} else {
-			putInt(index, Integer.reverseBytes(value));
-		}
-	}
-	
-	public final void putIntBigEndian(int index, int value) {
-		if (LITTLE_ENDIAN) {
-			putInt(index, Integer.reverseBytes(value));
-		} else {
-			putInt(index, value);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final long getLong(int index) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 8) {
-			return UNSAFE.getLong(heapMemory, pos);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final long getLongLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getLong(index);
-		} else {
-			return Long.reverseBytes(getLong(index));
-		}
-	}
-	
-	public final long getLongBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Long.reverseBytes(getLong(index));
-		} else {
-			return getLong(index);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void putLong(int index, long value) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 8) {
-			UNSAFE.putLong(heapMemory, pos, value);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putLongLittleEndian(int index, long value) {
-		if (LITTLE_ENDIAN) {
-			putLong(index, value);
-		} else {
-			putLong(index, Long.reverseBytes(value));
-		}
-	}
-	
-	public final void putLongBigEndian(int index, long value) {
-		if (LITTLE_ENDIAN) {
-			putLong(index, Long.reverseBytes(value));
-		} else {
-			putLong(index, value);
-		}
-	}
-
-	public final float getFloat(int index) {
-		return Float.intBitsToFloat(getInt(index));
-	}
-	
-	public final float getFloatLittleEndian(int index) {
-		return Float.intBitsToFloat(getIntLittleEndian(index));
-	}
-	
-	public final float getFloatBigEndian(int index) {
-		return Float.intBitsToFloat(getIntBigEndian(index));
-	}
-	
-	public final void putFloat(int index, float value) {
-		putInt(index, Float.floatToRawIntBits(value));
-	}
-	
-	public final void putFloatLittleEndian(int index, float value) {
-		putIntLittleEndian(index, Float.floatToRawIntBits(value));
-	}
-	
-	public final void putFloatBigEndian(int index, float value) {
-		putIntBigEndian(index, Float.floatToRawIntBits(value));
-	}
-	
-	public final double getDouble(int index) {
-		return Double.longBitsToDouble(getLong(index));
-	}
-	
-	public final double getDoubleLittleEndian(int index) {
-		return Double.longBitsToDouble(getLongLittleEndian(index));
-	}
-
-	public final double getDoubleBigEndian(int index) {
-		return Double.longBitsToDouble(getLongBigEndian(index));
-	}
-	
-	public final void putDouble(int index, double value) {
-		putLong(index, Double.doubleToRawLongBits(value));
-	}
-	
-	public final void putDoubleLittleEndian(int index, double value) {
-		putLongLittleEndian(index, Double.doubleToRawLongBits(value));
-	}
-	
-	public final void putDoubleBigEndian(int index, double value) {
-		putLongBigEndian(index, Double.doubleToRawLongBits(value));
-	}
-
-	// -------------------------------------------------------------------------
-	//                     Bulk Read and Write Methods
-	// -------------------------------------------------------------------------
-
-	public final void get(DataOutput out, int offset, int length) throws IOException {
-		if (heapMemory != null) {
-			out.write(heapMemory, offset, length);
-		}
-		else {
-			while (length >= 8) {
-				out.writeLong(getLongBigEndian(offset));
-				offset += 8;
-				length -= 8;
-			}
-	
-			while (length > 0) {
-				out.writeByte(get(offset));
-				offset++;
-				length--;
-			}
-		}
-	}
-
-	public final void put(DataInput in, int offset, int length) throws IOException {
-		if (heapMemory != null) {
-			in.readFully(heapMemory, offset, length);
-		}
-		else {
-			while (length >= 8) {
-				putLongBigEndian(offset, in.readLong());
-				offset += 8;
-				length -= 8;
-			}
-			while(length > 0) {
-				put(offset, in.readByte());
-				offset++;
-				length--;
-			}
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void get(int offset, ByteBuffer target, int numBytes) {
-		if (heapMemory != null) {
-			// ByteBuffer performs the boundary checks
-			target.put(heapMemory, offset, numBytes);
-		}
-		else {
-			// check the byte array offset and length
-			if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) {
-				throw new IndexOutOfBoundsException();
-			}
-	
-			final int targetOffset = target.position();
-			final int remaining = target.remaining();
-	
-			if (remaining < numBytes) {
-				throw new BufferOverflowException();
-			}
-	
-			if (target.isDirect()) {
-				// copy to the target memory directly
-				final long targetPointer = getAddress(target) + targetOffset;
-				final long sourcePointer = address + offset;
-	
-				if (sourcePointer <= addressLimit - numBytes) {
-					UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes);
-				}
-				else if (address > addressLimit) {
-					throw new IllegalStateException("This segment has been freed.");
-				}
-				else {
-					throw new IndexOutOfBoundsException();
-				}
-			}
-			else if (target.hasArray()) {
-				// move directly into the byte array
-				get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes);
-	
-				// this must be after the get() call to ensue that the byte buffer is not
-				// modified in case the call fails
-				target.position(targetOffset + numBytes);
-			}
-			else {
-				// neither heap buffer nor direct buffer
-				while (target.hasRemaining()) {
-					target.put(get(offset++));
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void put(int offset, ByteBuffer source, int numBytes) {
-		if (heapMemory != null) {
-			source.get(heapMemory, offset, numBytes);
-		}
-		else {
-			// check the byte array offset and length
-			if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) {
-				throw new IndexOutOfBoundsException();
-			}
-	
-			final int sourceOffset = source.position();
-			final int remaining = source.remaining();
-	
-			if (remaining < numBytes) {
-				throw new BufferUnderflowException();
-			}
-	
-			if (source.isDirect()) {
-				// copy to the target memory directly
-				final long sourcePointer = getAddress(source) + sourceOffset;
-				final long targetPointer = address + offset;
-	
-				if (sourcePointer <= addressLimit - numBytes) {
-					UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes);
-				}
-				else if (address > addressLimit) {
-					throw new IllegalStateException("This segment has been freed.");
-				}
-				else {
-					throw new IndexOutOfBoundsException();
-				}
-			}
-			else if (source.hasArray()) {
-				// move directly into the byte array
-				put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes);
-	
-				// this must be after the get() call to ensue that the byte buffer is not
-				// modified in case the call fails
-				source.position(sourceOffset + numBytes);
-			}
-			else {
-				// neither heap buffer nor direct buffer
-				while (source.hasRemaining()) {
-					put(offset++, source.get());
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void copyTo(int offset, PureHybridMemorySegment target, int targetOffset, int numBytes) {
-		final byte[] thisHeapRef = this.heapMemory;
-		final byte[] otherHeapRef = target.heapMemory;
-		final long thisPointer = this.address + offset;
-		final long otherPointer = target.address + targetOffset;
-
-		if (numBytes >= 0 & thisPointer <= this.addressLimit - numBytes & otherPointer <= target.addressLimit - numBytes) {
-			UNSAFE.copyMemory(thisHeapRef, thisPointer, otherHeapRef, otherPointer, numBytes);
-		}
-		else if (address > addressLimit | target.address > target.addressLimit) {
-			throw new IllegalStateException("segment has been freed.");
-		}
-		else {
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public int compare(PureHybridMemorySegment seg2, int offset1, int offset2, int len) {
-		while (len >= 8) {
-			long l1 = this.getLongBigEndian(offset1);
-			long l2 = seg2.getLongBigEndian(offset2);
-
-			if (l1 != l2) {
-				return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1;
-			}
-
-			offset1 += 8;
-			offset2 += 8;
-			len -= 8;
-		}
-		while (len > 0) {
-			int b1 = this.get(offset1) & 0xff;
-			int b2 = seg2.get(offset2) & 0xff;
-			int cmp = b1 - b2;
-			if (cmp != 0) {
-				return cmp;
-			}
-			offset1++;
-			offset2++;
-			len--;
-		}
-		return 0;
-	}
-
-	public void swapBytes(byte[] tempBuffer, PureHybridMemorySegment seg2, int offset1, int offset2, int len) {
-		if (len < 32) {
-			// fast path for short copies
-			while (len >= 8) {
-				long tmp = this.getLong(offset1);
-				this.putLong(offset1, seg2.getLong(offset2));
-				seg2.putLong(offset2, tmp);
-				offset1 += 8;
-				offset2 += 8;
-				len -= 8;
-			}
-			while (len > 0) {
-				byte tmp = this.get(offset1);
-				this.put(offset1, seg2.get(offset2));
-				seg2.put(offset2, tmp);
-				offset1++;
-				offset2++;
-				len--;
-			}
-		}
-		else if ( (offset1 | offset2 | len | (offset1 + len) | (offset2 + len) |
-				(this.size - (offset1 + len)) | (seg2.size() - (offset2 + len))) < 0 || len > tempBuffer.length)
-		{
-			throw new IndexOutOfBoundsException();
-		}
-		else {
-			final long thisPos = this.address + offset1;
-			final long otherPos = seg2.address + offset2;
-
-			if (thisPos <= this.addressLimit - len && otherPos <= seg2.addressLimit - len) {
-				final long arrayAddress = BYTE_ARRAY_BASE_OFFSET;
-
-				// this -> temp buffer
-				UNSAFE.copyMemory(this.heapMemory, thisPos, tempBuffer, arrayAddress, len);
-
-				// other -> this
-				UNSAFE.copyMemory(seg2.heapMemory, otherPos, this.heapMemory, thisPos, len);
-
-				// temp buffer -> other
-				UNSAFE.copyMemory(tempBuffer, arrayAddress, seg2.heapMemory, otherPos, len);
-			}
-			else if (this.address <= 0 || seg2.address <= 0) {
-				throw new IllegalStateException("Memory segment has been freed.");
-			}
-			else {
-				// index is in fact invalid
-				throw new IndexOutOfBoundsException();
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//                     Utilities for native memory accesses and checks
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("restriction")
-	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-
-	@SuppressWarnings("restriction")
-	private static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-
-	private static final long COPY_PER_BATCH = 1024 * 1024;
-
-	private static final Field ADDRESS_FIELD;
-
-	static {
-		try {
-			ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");
-			ADDRESS_FIELD.setAccessible(true);
-		}
-		catch (Throwable t) {
-			throw new RuntimeException("Cannot initialize DirectMemorySegment - direct memory not supported by the JVM.");
-		}
-	}
-
-	private static long getAddress(ByteBuffer buf) {
-		try {
-			return (Long) ADDRESS_FIELD.get(buf);
-		}
-		catch (Throwable t) {
-			throw new RuntimeException("Could not access direct byte buffer address.", t);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegmentOutView.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegmentOutView.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegmentOutView.java
deleted file mode 100644
index cda48e1..0000000
--- a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHybridMemorySegmentOutView.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory.benchmarks;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.util.List;
-
-public final class PureHybridMemorySegmentOutView implements DataOutputView {
-
-	private PureHybridMemorySegment currentSegment;	// the current memory segment to write to
-
-	private int positionInSegment;					// the offset in the current segment
-	
-	private final int segmentSize;				// the size of the memory segments
-
-	private final  List<PureHybridMemorySegment> memorySource;
-	
-	private final List<PureHybridMemorySegment> fullSegments;
-	
-
-	private byte[] utfBuffer;		// the reusable array for UTF encodings
-
-
-	public PureHybridMemorySegmentOutView(List<PureHybridMemorySegment> emptySegments,
-										  List<PureHybridMemorySegment> fullSegmentTarget, int segmentSize) {
-		this.segmentSize = segmentSize;
-		this.currentSegment = emptySegments.remove(emptySegments.size() - 1);
-
-		this.memorySource = emptySegments;
-		this.fullSegments = fullSegmentTarget;
-		this.fullSegments.add(getCurrentSegment());
-	}
-
-
-	public void reset() {
-		if (this.fullSegments.size() != 0) {
-			throw new IllegalStateException("The target list still contains memory segments.");
-		}
-
-		clear();
-		try {
-			advance();
-		}
-		catch (IOException ioex) {
-			throw new RuntimeException("Error getting first segment for record collector.", ioex);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                  Page Management
-	// --------------------------------------------------------------------------------------------
-
-	public PureHybridMemorySegment nextSegment(PureHybridMemorySegment current, int positionInCurrent) throws EOFException {
-		int size = this.memorySource.size();
-		if (size > 0) {
-			final PureHybridMemorySegment next = this.memorySource.remove(size - 1);
-			this.fullSegments.add(next);
-			return next;
-		} else {
-			throw new EOFException();
-		}
-	}
-	
-	public PureHybridMemorySegment getCurrentSegment() {
-		return this.currentSegment;
-	}
-
-	public int getCurrentPositionInSegment() {
-		return this.positionInSegment;
-	}
-	
-	public int getSegmentSize() {
-		return this.segmentSize;
-	}
-	
-	protected void advance() throws IOException {
-		this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment);
-		this.positionInSegment = 0;
-	}
-	
-	protected void seekOutput(PureHybridMemorySegment seg, int position) {
-		this.currentSegment = seg;
-		this.positionInSegment = position;
-	}
-
-	protected void clear() {
-		this.currentSegment = null;
-		this.positionInSegment = 0;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//                               Data Output Specific methods
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void write(int b) throws IOException {
-		writeByte(b);
-	}
-
-	@Override
-	public void write(byte[] b) throws IOException {
-		write(b, 0, b.length);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		int remaining = this.segmentSize - this.positionInSegment;
-		if (remaining >= len) {
-			this.currentSegment.put(this.positionInSegment, b, off, len);
-			this.positionInSegment += len;
-		}
-		else {
-			if (remaining == 0) {
-				advance();
-				remaining = this.segmentSize - this.positionInSegment;
-			}
-			while (true) {
-				int toPut = Math.min(remaining, len);
-				this.currentSegment.put(this.positionInSegment, b, off, toPut);
-				off += toPut;
-				len -= toPut;
-
-				if (len > 0) {
-					this.positionInSegment = this.segmentSize;
-					advance();
-					remaining = this.segmentSize - this.positionInSegment;
-				}
-				else {
-					this.positionInSegment += toPut;
-					break;
-				}
-			}
-		}
-	}
-
-	@Override
-	public void writeBoolean(boolean v) throws IOException {
-		writeByte(v ? 1 : 0);
-	}
-
-	@Override
-	public void writeByte(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize) {
-			this.currentSegment.put(this.positionInSegment++, (byte) v);
-		}
-		else {
-			advance();
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeShort(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 1) {
-			this.currentSegment.putShortBigEndian(this.positionInSegment, (short) v);
-			this.positionInSegment += 2;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeShort(v);
-		}
-		else {
-			writeByte(v >> 8);
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeChar(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 1) {
-			this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v);
-			this.positionInSegment += 2;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeChar(v);
-		}
-		else {
-			writeByte(v >> 8);
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeInt(int v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 3) {
-			this.currentSegment.putIntBigEndian(this.positionInSegment, v);
-			this.positionInSegment += 4;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeInt(v);
-		}
-		else {
-			writeByte(v >> 24);
-			writeByte(v >> 16);
-			writeByte(v >>  8);
-			writeByte(v);
-		}
-	}
-
-	@Override
-	public void writeLong(long v) throws IOException {
-		if (this.positionInSegment < this.segmentSize - 7) {
-			this.currentSegment.putLongBigEndian(this.positionInSegment, v);
-			this.positionInSegment += 8;
-		}
-		else if (this.positionInSegment == this.segmentSize) {
-			advance();
-			writeLong(v);
-		}
-		else {
-			writeByte((int) (v >> 56));
-			writeByte((int) (v >> 48));
-			writeByte((int) (v >> 40));
-			writeByte((int) (v >> 32));
-			writeByte((int) (v >> 24));
-			writeByte((int) (v >> 16));
-			writeByte((int) (v >>  8));
-			writeByte((int) v);
-		}
-	}
-
-	@Override
-	public void writeFloat(float v) throws IOException {
-		writeInt(Float.floatToRawIntBits(v));
-	}
-
-	@Override
-	public void writeDouble(double v) throws IOException {
-		writeLong(Double.doubleToRawLongBits(v));
-	}
-
-	@Override
-	public void writeBytes(String s) throws IOException {
-		for (int i = 0; i < s.length(); i++) {
-			writeByte(s.charAt(i));
-		}
-	}
-
-	@Override
-	public void writeChars(String s) throws IOException {
-		for (int i = 0; i < s.length(); i++) {
-			writeChar(s.charAt(i));
-		}
-	}
-
-	@Override
-	public void writeUTF(String str) throws IOException {
-		int strlen = str.length();
-		int utflen = 0;
-		int c, count = 0;
-
-		/* use charAt instead of copying String to char array */
-		for (int i = 0; i < strlen; i++) {
-			c = str.charAt(i);
-			if ((c >= 0x0001) && (c <= 0x007F)) {
-				utflen++;
-			} else if (c > 0x07FF) {
-				utflen += 3;
-			} else {
-				utflen += 2;
-			}
-		}
-
-		if (utflen > 65535) {
-			throw new UTFDataFormatException("encoded string too long: " + utflen + " memory");
-		}
-
-		if (this.utfBuffer == null || this.utfBuffer.length < utflen + 2) {
-			this.utfBuffer = new byte[utflen + 2];
-		}
-		final byte[] bytearr = this.utfBuffer;
-
-		bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-		bytearr[count++] = (byte) (utflen & 0xFF);
-
-		int i = 0;
-		for (i = 0; i < strlen; i++) {
-			c = str.charAt(i);
-			if (!((c >= 0x0001) && (c <= 0x007F))) {
-				break;
-			}
-			bytearr[count++] = (byte) c;
-		}
-
-		for (; i < strlen; i++) {
-			c = str.charAt(i);
-			if ((c >= 0x0001) && (c <= 0x007F)) {
-				bytearr[count++] = (byte) c;
-
-			} else if (c > 0x07FF) {
-				bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
-				bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
-				bytearr[count++] = (byte) (0x80 | (c & 0x3F));
-			} else {
-				bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
-				bytearr[count++] = (byte) (0x80 | (c & 0x3F));
-			}
-		}
-
-		write(bytearr, 0, utflen + 2);
-	}
-
-	@Override
-	public void skipBytesToWrite(int numBytes) throws IOException {
-		while (numBytes > 0) {
-			final int remaining = this.segmentSize - this.positionInSegment;
-			if (numBytes <= remaining) {
-				this.positionInSegment += numBytes;
-				return;
-			}
-			this.positionInSegment = this.segmentSize;
-			advance();
-			numBytes -= remaining;
-		}
-	}
-
-	@Override
-	public void write(DataInputView source, int numBytes) throws IOException {
-		while (numBytes > 0) {
-			final int remaining = this.segmentSize - this.positionInSegment;
-			if (numBytes <= remaining) {
-				this.currentSegment.put(source, this.positionInSegment, numBytes);
-				this.positionInSegment += numBytes;
-				return;
-			}
-
-			if (remaining > 0) {
-				this.currentSegment.put(source, this.positionInSegment, remaining);
-				this.positionInSegment = this.segmentSize;
-				numBytes -= remaining;
-			}
-
-			advance();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegment.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegment.java
deleted file mode 100644
index 1280242..0000000
--- a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegment.java
+++ /dev/null
@@ -1,790 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory.benchmarks;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryUtils;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public final class PureOffHeapMemorySegment {
-
-	/** Constant that flags the byte order. Because this is a boolean constant,
-	 * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */
-	private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
-	
-	/** The direct byte buffer that allocated the memory */
-	private ByteBuffer buffer;
-
-	/** The address to the off-heap data */
-	private long address;
-
-	/** The address one byte after the last addressable byte.
-	 *  This is address + size while the segment is not disposed */
-	private final long addressLimit;
-
-	/** The size in bytes of the memory segment */
-	private final int size;
-
-	// -------------------------------------------------------------------------
-	//                             Constructors
-	// -------------------------------------------------------------------------
-
-	/**
-	 * Creates a new memory segment that represents the memory backing the given direct byte buffer.
-	 * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
-	 * otherwise this method with throw an IllegalArgumentException. data in the given byte array.
-	 *
-	 * @param buffer The byte buffer whose memory is represented by this memory segment.
-	 * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
-	 */
-	public PureOffHeapMemorySegment(ByteBuffer buffer) {
-		if (buffer == null || !buffer.isDirect()) {
-			throw new IllegalArgumentException("Can't initialize from non-direct ByteBuffer.");
-		}
-
-		this.buffer = buffer;
-		this.size = buffer.capacity();
-		this.address = getAddress(buffer);
-		this.addressLimit = this.address + size;
-
-		if (address >= Long.MAX_VALUE - Integer.MAX_VALUE) {
-			throw new RuntimeException("Segment initialized with too large address: " + address);
-		}
-	}
-
-	// -------------------------------------------------------------------------
-	//                      Direct Memory Segment Specifics
-	// -------------------------------------------------------------------------
-
-	/**
-	 * Gets the buffer that owns the memory of this memory segment.
-	 *
-	 * @return The byte buffer that owns the memory of this memory segment.
-	 */
-	public ByteBuffer getBuffer() {
-		return this.buffer;
-	}
-
-	/**
-	 * Gets the memory address of the memory backing this memory segment.
-	 *
-	 * @return The memory start address of the memory backing this memory segment. 
-	 */
-	public long getAddress() {
-		return address;
-	}
-
-	// -------------------------------------------------------------------------
-	//                        MemorySegment Accessors
-	// -------------------------------------------------------------------------
-	
-	public final boolean isFreed() {
-		return this.address > this.addressLimit;
-	}
-	
-	public final void free() {
-		// this ensures we can place no more data and trigger
-		// the checks for the freed segment
-		this.address = this.addressLimit + 1;
-		this.buffer = null;
-	}
-	
-	public final int size() {
-		return this.size;
-	}
-
-	public ByteBuffer wrap(int offset, int length) {
-		if (offset < 0 || offset > this.size || offset > this.size - length) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		this.buffer.limit(offset + length);
-		this.buffer.position(offset);
-
-		return this.buffer;
-	}
-
-
-	// ------------------------------------------------------------------------
-	//                    Random Access get() and put() methods
-	// ------------------------------------------------------------------------
-	
-	@SuppressWarnings("restriction")
-	public final byte get(int index) {
-
-		final long pos = address + index;
-		if (index >= 0 && pos < addressLimit) {
-			return UNSAFE.getByte(pos);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-	
-	@SuppressWarnings("restriction")
-	public final void put(int index, byte b) {
-
-		final long pos = address + index;
-		if (index >= 0 && pos < addressLimit) {
-			UNSAFE.putByte(pos, b);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-	
-	public final void get(int index, byte[] dst) {
-		get(index, dst, 0, dst.length);
-	}
-	
-	public final void put(int index, byte[] src) {
-		put(index, src, 0, src.length);
-	}
-	
-	@SuppressWarnings("restriction")
-	public final void get(int index, byte[] dst, int offset, int length) {
-
-		// check the byte array offset and length
-		if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		long pos = address + index;
-
-		if (index >= 0 && pos <= addressLimit - length) {
-			long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
-
-			// the copy must proceed in batches not too large, because the JVM may
-			// poll for points that are safe for GC (moving the array and changing its address)
-			while (length > 0) {
-				long toCopy = (length > COPY_PER_BATCH) ? COPY_PER_BATCH : length;
-				UNSAFE.copyMemory(null, pos, dst, arrayAddress, toCopy);
-				length -= toCopy;
-				pos += toCopy;
-				arrayAddress += toCopy;
-			}
-		}
-		else if (address <= 0) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-	
-	@SuppressWarnings("restriction")
-	public final void put(int index, byte[] src, int offset, int length) {
-		// check the byte array offset and length
-		if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		long pos = address + index;
-
-		if (index >= 0 && pos <= addressLimit - length) {
-
-			long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
-			while (length > 0) {
-				long toCopy = (length > COPY_PER_BATCH) ? COPY_PER_BATCH : length;
-				UNSAFE.copyMemory(src, arrayAddress, null, pos, toCopy);
-				length -= toCopy;
-				pos += toCopy;
-				arrayAddress += toCopy;
-			}
-		}
-		else if (address <= 0) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final boolean getBoolean(int index) {
-		return get(index) != 0;
-	}
-
-	public final void putBoolean(int index, boolean value) {
-		put(index, (byte) (value ? 1 : 0));
-	}
-
-	@SuppressWarnings("restriction")
-	public final char getChar(int index) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 2) {
-			return UNSAFE.getChar(pos);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final char getCharLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getChar(index);
-		} else {
-			return Character.reverseBytes(getChar(index));
-		}
-	}
-
-	public final char getCharBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Character.reverseBytes(getChar(index));
-		} else {
-			return getChar(index);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void putChar(int index, char value) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 2) {
-			UNSAFE.putChar(pos, value);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putCharLittleEndian(int index, char value) {
-		if (LITTLE_ENDIAN) {
-			putChar(index, value);
-		} else {
-			putChar(index, Character.reverseBytes(value));
-		}
-	}
-
-	public final void putCharBigEndian(int index, char value) {
-		if (LITTLE_ENDIAN) {
-			putChar(index, Character.reverseBytes(value));
-		} else {
-			putChar(index, value);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final short getShort(int index) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 2) {
-			return UNSAFE.getShort(pos);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final short getShortLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getShort(index);
-		} else {
-			return Short.reverseBytes(getShort(index));
-		}
-	}
-	
-	public final short getShortBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Short.reverseBytes(getShort(index));
-		} else {
-			return getShort(index);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void putShort(int index, short value) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 2) {
-			UNSAFE.putShort(pos, value);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putShortLittleEndian(int index, short value) {
-		if (LITTLE_ENDIAN) {
-			putShort(index, value);
-		} else {
-			putShort(index, Short.reverseBytes(value));
-		}
-	}
-	
-	public final void putShortBigEndian(int index, short value) {
-		if (LITTLE_ENDIAN) {
-			putShort(index, Short.reverseBytes(value));
-		} else {
-			putShort(index, value);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final int getInt(int index) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 4) {
-			return UNSAFE.getInt(pos);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final int getIntLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getInt(index);
-		} else {
-			return Integer.reverseBytes(getInt(index));
-		}
-	}
-	
-	public final int getIntBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Integer.reverseBytes(getInt(index));
-		} else {
-			return getInt(index);
-		}
-	}
-	
-	@SuppressWarnings("restriction")
-	public final void putInt(int index, int value) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 4) {
-			UNSAFE.putInt(pos, value);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putIntLittleEndian(int index, int value) {
-		if (LITTLE_ENDIAN) {
-			putInt(index, value);
-		} else {
-			putInt(index, Integer.reverseBytes(value));
-		}
-	}
-	
-	public final void putIntBigEndian(int index, int value) {
-		if (LITTLE_ENDIAN) {
-			putInt(index, Integer.reverseBytes(value));
-		} else {
-			putInt(index, value);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final long getLong(int index) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 8) {
-			return UNSAFE.getLong(pos);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final long getLongLittleEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return getLong(index);
-		} else {
-			return Long.reverseBytes(getLong(index));
-		}
-	}
-	
-	public final long getLongBigEndian(int index) {
-		if (LITTLE_ENDIAN) {
-			return Long.reverseBytes(getLong(index));
-		} else {
-			return getLong(index);
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void putLong(int index, long value) {
-		final long pos = address + index;
-		if (index >= 0 && pos <= addressLimit - 8) {
-			UNSAFE.putLong(pos, value);
-		}
-		else if (address > addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			// index is in fact invalid
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public final void putLongLittleEndian(int index, long value) {
-		if (LITTLE_ENDIAN) {
-			putLong(index, value);
-		} else {
-			putLong(index, Long.reverseBytes(value));
-		}
-	}
-	
-	public final void putLongBigEndian(int index, long value) {
-		if (LITTLE_ENDIAN) {
-			putLong(index, Long.reverseBytes(value));
-		} else {
-			putLong(index, value);
-		}
-	}
-
-	public final float getFloat(int index) {
-		return Float.intBitsToFloat(getInt(index));
-	}
-	
-	public final float getFloatLittleEndian(int index) {
-		return Float.intBitsToFloat(getIntLittleEndian(index));
-	}
-	
-	public final float getFloatBigEndian(int index) {
-		return Float.intBitsToFloat(getIntBigEndian(index));
-	}
-	
-	public final void putFloat(int index, float value) {
-		putInt(index, Float.floatToRawIntBits(value));
-	}
-	
-	public final void putFloatLittleEndian(int index, float value) {
-		putIntLittleEndian(index, Float.floatToRawIntBits(value));
-	}
-	
-	public final void putFloatBigEndian(int index, float value) {
-		putIntBigEndian(index, Float.floatToRawIntBits(value));
-	}
-	
-	public final double getDouble(int index) {
-		return Double.longBitsToDouble(getLong(index));
-	}
-	
-	public final double getDoubleLittleEndian(int index) {
-		return Double.longBitsToDouble(getLongLittleEndian(index));
-	}
-
-	public final double getDoubleBigEndian(int index) {
-		return Double.longBitsToDouble(getLongBigEndian(index));
-	}
-	
-	public final void putDouble(int index, double value) {
-		putLong(index, Double.doubleToRawLongBits(value));
-	}
-	
-	public final void putDoubleLittleEndian(int index, double value) {
-		putLongLittleEndian(index, Double.doubleToRawLongBits(value));
-	}
-	
-	public final void putDoubleBigEndian(int index, double value) {
-		putLongBigEndian(index, Double.doubleToRawLongBits(value));
-	}
-
-	// -------------------------------------------------------------------------
-	//                     Bulk Read and Write Methods
-	// -------------------------------------------------------------------------
-
-	public final void get(DataOutput out, int offset, int length) throws IOException {
-		while (length >= 8) {
-			out.writeLong(getLongBigEndian(offset));
-			offset += 8;
-			length -= 8;
-		}
-
-		while(length > 0) {
-			out.writeByte(get(offset));
-			offset++;
-			length--;
-		}
-	}
-
-	public final void put(DataInput in, int offset, int length) throws IOException {
-		while (length >= 8) {
-			putLongBigEndian(offset, in.readLong());
-			offset += 8;
-			length -= 8;
-		}
-		while(length > 0) {
-			put(offset, in.readByte());
-			offset++;
-			length--;
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void get(int offset, ByteBuffer target, int numBytes) {
-
-		// check the byte array offset and length
-		if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		final int targetOffset = target.position();
-		final int remaining = target.remaining();
-
-		if (remaining < numBytes) {
-			throw new BufferOverflowException();
-		}
-
-		if (target.isDirect()) {
-			// copy to the target memory directly
-			final long targetPointer = getAddress(target) + targetOffset;
-			final long sourcePointer = address + offset;
-
-			if (sourcePointer <= addressLimit - numBytes) {
-				UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes);
-			}
-			else if (address > addressLimit) {
-				throw new IllegalStateException("This segment has been freed.");
-			}
-			else {
-				throw new IndexOutOfBoundsException();
-			}
-		}
-		else if (target.hasArray()) {
-			// move directly into the byte array
-			get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes);
-
-			// this must be after the get() call to ensue that the byte buffer is not
-			// modified in case the call fails
-			target.position(targetOffset + numBytes);
-		}
-		else {
-			// neither heap buffer nor direct buffer
-			while (target.hasRemaining()) {
-				target.put(get(offset++));
-			}
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void put(int offset, ByteBuffer source, int numBytes) {
-
-		// check the byte array offset and length
-		if ((offset | numBytes | (offset + numBytes) | (size - (offset + numBytes))) < 0) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		final int sourceOffset = source.position();
-		final int remaining = source.remaining();
-
-		if (remaining < numBytes) {
-			throw new BufferUnderflowException();
-		}
-
-		if (source.isDirect()) {
-			// copy to the target memory directly
-			final long sourcePointer = getAddress(source) + sourceOffset;
-			final long targetPointer = address + offset;
-
-			if (sourcePointer <= addressLimit - numBytes) {
-				UNSAFE.copyMemory(sourcePointer, targetPointer, numBytes);
-			}
-			else if (address > addressLimit) {
-				throw new IllegalStateException("This segment has been freed.");
-			}
-			else {
-				throw new IndexOutOfBoundsException();
-			}
-		}
-		else if (source.hasArray()) {
-			// move directly into the byte array
-			put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes);
-
-			// this must be after the get() call to ensue that the byte buffer is not
-			// modified in case the call fails
-			source.position(sourceOffset + numBytes);
-		}
-		else {
-			// neither heap buffer nor direct buffer
-			while (source.hasRemaining()) {
-				put(offset++, source.get());
-			}
-		}
-	}
-
-	@SuppressWarnings("restriction")
-	public final void copyTo(int offset, PureOffHeapMemorySegment target, int targetOffset, int numBytes) {
-		final long thisPointer = address + offset;
-		final long otherPointer = target.address + targetOffset;
-
-		if (numBytes >= 0 && thisPointer <= addressLimit - numBytes && otherPointer <= target.addressLimit - numBytes) {
-			UNSAFE.copyMemory(thisPointer, otherPointer, numBytes);
-		}
-		else if (address > addressLimit || target.address > target.addressLimit) {
-			throw new IllegalStateException("This segment has been freed.");
-		}
-		else {
-			throw new IndexOutOfBoundsException();
-		}
-	}
-
-	public int compare(MemorySegment seg2, int offset1, int offset2, int len) {
-		while (len >= 8) {
-			long l1 = this.getLongBigEndian(offset1);
-			long l2 = seg2.getLongBigEndian(offset2);
-
-			if (l1 != l2) {
-				return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1;
-			}
-
-			offset1 += 8;
-			offset2 += 8;
-			len -= 8;
-		}
-		while (len > 0) {
-			int b1 = this.get(offset1) & 0xff;
-			int b2 = seg2.get(offset2) & 0xff;
-			int cmp = b1 - b2;
-			if (cmp != 0) {
-				return cmp;
-			}
-			offset1++;
-			offset2++;
-			len--;
-		}
-		return 0;
-	}
-
-	public void swapBytes(byte[] tempBuffer, PureOffHeapMemorySegment seg2, int offset1, int offset2, int len) {
-		if (len < 32) {
-			// fast path for short copies
-			while (len >= 8) {
-				long tmp = this.getLong(offset1);
-				this.putLong(offset1, seg2.getLong(offset2));
-				seg2.putLong(offset2, tmp);
-				offset1 += 8;
-				offset2 += 8;
-				len -= 8;
-			}
-			while (len > 0) {
-				byte tmp = this.get(offset1);
-				this.put(offset1, seg2.get(offset2));
-				seg2.put(offset2, tmp);
-				offset1++;
-				offset2++;
-				len--;
-			}
-		}
-		else if ( (offset1 | offset2 | len | (offset1 + len) | (offset2 + len) |
-				(this.size - (offset1 + len)) | (seg2.size() - (offset2 + len))) < 0 || len > tempBuffer.length)
-		{
-			throw new IndexOutOfBoundsException();
-		}
-		else {
-			final long thisPos = this.address + offset1;
-			final long otherPos = seg2.address + offset2;
-
-			if (thisPos <= this.addressLimit - len && otherPos <= seg2.addressLimit - len) {
-				final long arrayAddress = BYTE_ARRAY_BASE_OFFSET;
-
-				// this -> temp buffer
-				UNSAFE.copyMemory(null, thisPos, tempBuffer, arrayAddress, len);
-
-				// other -> this
-				UNSAFE.copyMemory(null, otherPos, null, thisPos, len);
-
-				// temp buffer -> other
-				UNSAFE.copyMemory(tempBuffer, arrayAddress, null, otherPos, len);
-			}
-			else if (this.address <= 0 || seg2.address <= 0) {
-				throw new IllegalStateException("Memory segment has been freed.");
-			}
-			else {
-				// index is in fact invalid
-				throw new IndexOutOfBoundsException();
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//                     Utilities for native memory accesses and checks
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("restriction")
-	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-
-	@SuppressWarnings("restriction")
-	private static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-
-	private static final long COPY_PER_BATCH = 1024 * 1024;
-
-	private static final Field ADDRESS_FIELD;
-
-	static {
-		try {
-			ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");
-			ADDRESS_FIELD.setAccessible(true);
-		}
-		catch (Throwable t) {
-			throw new RuntimeException("Cannot initialize DirectMemorySegment - direct memory not supported by the JVM.");
-		}
-	}
-
-	private static long getAddress(ByteBuffer buf) {
-		try {
-			return (Long) ADDRESS_FIELD.get(buf);
-		}
-		catch (Throwable t) {
-			throw new RuntimeException("Could not access direct byte buffer address.", t);
-		}
-	}
-}