You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/08 20:59:04 UTC

[14/15] flink git commit: [FLINK-1320] [core] Add an off-heap variant of the managed memory

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index ce080d3..31d5563 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.core.memory;
 
 import java.io.DataInput;
@@ -26,26 +25,39 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
 /**
- * This class represents a piece of memory allocated from the memory manager. The segment is backed
- * by a byte array and features random put and get methods for the basic types that are stored in a byte-wise
- * fashion in the memory.
- * 
+ * This class represents a piece of memory managed by Flink.
+ * The segment may be backed by heap memory (byte array) or by off-heap memory.
+ * <p>
+ * The methods for individual memory access are specialized in the classes
+ * {@link org.apache.flink.core.memory.HeapMemorySegment} and
+ * {@link org.apache.flink.core.memory.HybridMemorySegment}.
+ * All methods that operate across two memory segments are implemented in this class,
+ * to transparently handle the mixing of memory segment types.
  * <p>
+ * This class fulfills conceptually a similar purpose as Java's {@link java.nio.ByteBuffer}.
+ * We add this specialized class for various reasons:
+ * <ul>
+ *     <li>It offers additional binary compare, swap, and copy methods.</li>
+ *     <li>It uses collapsed checks for range check and memory segment disposal.</li>
+ *     <li>It offers absolute positioning methods for bulk put/get methods, to guarantee
+ *         thread safe use.</li>
+ *     <li>It offers explicit big-endian / little-endian access methods, rather than tracking internally
+ *         a byte order.</li>
+ *     <li>It transparently and efficiently moves data between on-heap and off-heap variants.</li>
+ * </ul>
  * 
- * Comments on the implementation: We make heavy use of operations that are supported by native
+ * <i>Comments on the implementation</i>:
+ * We make heavy use of operations that are supported by native
  * instructions, to achieve a high efficiency. Multi byte types (int, long, float, double, ...)
- * are read and written with "unsafe" native commands. Little-endian to big-endian conversion and
- * vice versa are done using the static <i>reverseBytes</i> methods in the boxing data types
- * (for example {@link Integer#reverseBytes(int)}). On x86/amd64, these are translated by the
- * jit compiler to <i>bswap</i> intrinsic commands.
- * 
- * Below is an example of the code generated for the {@link MemorySegment#putLongBigEndian(int, long)}
- * function by the just-in-time compiler. The code is grabbed from an oracle jvm 7 using the
+ * are read and written with "unsafe" native commands.
+ * <p>
+ * Below is an example of the code generated for the {@link HeapMemorySegment#putLongBigEndian(int, long)}
+ * function by the just-in-time compiler. The code is grabbed from an Oracle JVM 7 using the
  * hotspot disassembler library (hsdis32.dll) and the jvm command
- * <i>-XX:+UnlockDiagnosticVMOptions -XX:CompileCommand=print,*UnsafeMemorySegment.putLongBigEndian</i>.
+ * <i>-XX:+UnlockDiagnosticVMOptions -XX:CompileCommand=print,*MemorySegment.putLongBigEndian</i>.
  * Note that this code realizes both the byte order swapping and the reinterpret cast access to
  * get a long from the byte array.
- * 
+ *
  * <pre>
  * [Verified Entry Point]
  *   0x00007fc403e19920: sub    $0x18,%rsp
@@ -66,286 +78,539 @@ import java.nio.ByteOrder;
  *                                                 ;   {poll_return}
  *   0x00007fc403e1994a: retq 
  * </pre>
+ *
+ * <i>Note on efficiency</i>:
+ * For best efficiency, the code that uses this class should make sure that only one
+ * subclass is loaded, or that the methods that are abstract in this class are used only from one of the
+ * subclasses (either the {@link org.apache.flink.core.memory.HeapMemorySegment}, or the 
+ * {@link org.apache.flink.core.memory.HybridMemorySegment}).
+ * 
+ * That way, all the abstract methods in the MemorySegment base class have only one loaded
+ * actual implementation. This is easy for the JIT to recognize through class hierarchy analysis,
+ * or by identifying that the invocations are monomorphic (all go to the same concrete
+ * method implementation). Under these conditions, the JIT can perfectly inline methods.
  */
-public class MemorySegment {
+public abstract class MemorySegment {
+
+	/** The unsafe handle for transparent memory copied (heap / off-heap) */
+	@SuppressWarnings("restriction")
+	protected static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+	/** The beginning of the byte array contents, relative to the byte array object */
+	@SuppressWarnings("restriction")
+	protected static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+	
+	/** Constant that flags the byte order. Because this is a boolean constant,
+	 * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */
+	private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
+	
+	// ------------------------------------------------------------------------
+
+	/** The heap byte array object relative to which we access the memory. Is non-null if the
+	 *  memory is on the heap, and is null, if the memory if off the heap. If we have this buffer, we
+	 *  must never void this reference, or the memory segment will point to undefined addresses 
+	 *  outside the heap and may in out-of-order execution cases cause segmentation faults. */
+	protected final byte[] heapMemory;
+
+	/** The address to the data, relative to the heap memory byte array. If the heap memory byte array
+	 * is null, this becomes an absolute memory address outside the heap. */
+	protected long address;
+
+	/** The address one byte after the last addressable byte.
+	 *  This is address + size while the segment is not disposed */
+	protected final long addressLimit;
 	
-	// flag to enable / disable boundary checks. Note that the compiler eliminates the
-	// code paths of the checks (as dead code) when this constant is set to false.
-	private static final boolean CHECKED = true;
+	/** The size in bytes of the memory segment */
+	protected final int size;
 	
+	/** Optional owner of the memory segment */
+	private final Object owner;
+
 	/**
-	 * The array in which the data is stored.
+	 * Creates a new memory segment that represents the memory of the byte array.
+	 * Since the byte array is backed by on-heap memory, this memory segment holds its
+	 * data on heap. The buffer must be at least of size 8 bytes.
+	 *
+	 * @param buffer The byte array whose memory is represented by this memory segment.
 	 */
-	protected byte[] memory;
-	
+	MemorySegment(byte[] buffer, Object owner) {
+		if (buffer == null) {
+			throw new NullPointerException("buffer");
+		}
+		
+		this.heapMemory = buffer;
+		this.address = BYTE_ARRAY_BASE_OFFSET;
+		this.size = buffer.length;
+		this.addressLimit = this.address + this.size;
+		this.owner = owner;
+	}
+
 	/**
-	 * Wrapper for I/O requests.
+	 * Creates a new memory segment that represents the memory at the absolute address given
+	 * by the pointer.
+	 *
+	 * @param offHeapAddress The address of the memory represented by this memory segment.
+	 * @param size The size of this memory segment.
 	 */
-	protected ByteBuffer wrapper;
+	MemorySegment(long offHeapAddress, int size, Object owner) {
+		if (offHeapAddress <= 0) {
+			throw new IllegalArgumentException("negative pointer or size");
+		}
+		if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) {
+			// this is necessary to make sure the collapsed checks are safe against numeric overflows
+			throw new IllegalArgumentException("Segment initialized with too large address: " + address
+					+ " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1));
+		}
+		
+		this.heapMemory = null;
+		this.address = offHeapAddress;
+		this.addressLimit = this.address + size;
+		this.size = size;
+		this.owner = owner;
+	}
 	
-	// -------------------------------------------------------------------------
-	//                             Constructors
-	// -------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	// Memory Segment Operations
+	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new memory segment that represents the data in the given byte array.
-	 * 
-	 * @param memory The byte array that holds the data.
+	 * Gets the size of the memory segment, in bytes.
+	 * @return The size of the memory segment.
 	 */
-	public MemorySegment(byte[] memory) {
-		this.memory = memory;
+	public int size() {
+		return size;
 	}
 
-	// -------------------------------------------------------------------------
-	//                        MemorySegment Accessors
-	// -------------------------------------------------------------------------
-	
 	/**
-	 * Checks whether this memory segment has already been freed. In that case, the
-	 * segment must not be used any more.
-	 * 
-	 * @return True, if the segment has been freed, false otherwise.
+	 * Checks whether the memory segment was freed.
+	 * @return True, if the memory segment has been freed, false otherwise.
 	 */
-	public final boolean isFreed() {
-		return this.memory == null;
+	public boolean isFreed() {
+		return address > addressLimit;
 	}
 
-	public final void free() {
-		this.wrapper = null;
-		this.memory = null;
+	/**
+	 * Frees this memory segment. After this operation has been called, no further operations are
+	 * possible on the memory segment and will fail. The actual memory (heap or off-heap) will only
+	 * be released after this memory segment object has become garbage collected. 
+	 */
+	public void free() {
+		// this ensures we can place no more data and trigger
+		// the checks for the freed segment
+		address = addressLimit + 1;
 	}
-	
+
 	/**
-	 * Gets the size of the memory segment, in bytes. Because segments
-	 * are backed by arrays, they cannot be larger than two GiBytes.
-	 * 
-	 * @return The size in bytes.
+	 * Checks whether this memory segment is backed by off-heap memory.
+	 * @return True, if the memory segment is backed by off-heap memory, false if it is backed
+	 *         by heap memory.
 	 */
-	public final int size() {
-		return this.memory.length;
+	public boolean isOffHeap() {
+		return heapMemory == null;
 	}
 
 	/**
 	 * Wraps the chunk of the underlying memory located between <tt>offset<tt> and 
 	 * <tt>length</tt> in a NIO ByteBuffer.
-	 * 
+	 *
 	 * @param offset The offset in the memory segment.
 	 * @param length The number of bytes to be wrapped as a buffer.
 	 * @return A <tt>ByteBuffer</tt> backed by the specified portion of the memory segment.
 	 * @throws IndexOutOfBoundsException Thrown, if offset is negative or larger than the memory segment size,
 	 *                                   or if the offset plus the length is larger than the segment size.
 	 */
-	public ByteBuffer wrap(int offset, int length) {
-		if (offset > this.memory.length || offset > this.memory.length - length) {
-			throw new IndexOutOfBoundsException();
-		}
-		
-		if (this.wrapper == null) {
-			this.wrapper = ByteBuffer.wrap(this.memory, offset, length);
-		}
-		else {
-			this.wrapper.limit(offset + length);
-			this.wrapper.position(offset);
-		}
+	public abstract ByteBuffer wrap(int offset, int length);
 
-		return this.wrapper;
+	/**
+	 * Gets the owner of this memory segment. Returns null, if the owner was not set.
+	 * @return The owner of the memory segment, or null, if it does not have an owner.
+	 */
+	public Object getOwner() {
+		return owner;
 	}
-
+	
+	
 	// ------------------------------------------------------------------------
 	//                    Random Access get() and put() methods
 	// ------------------------------------------------------------------------
 
-	// --------------------------------------------------------------------------------------------
-	// WARNING: Any code for range checking must take care to avoid integer overflows. The position
-	// integer may go up to <code>Integer.MAX_VALUE</tt>. Range checks that work after the principle
-	// <code>position + 3 &lt; end</code> may fail because <code>position + 3</code> becomes negative.
-	// A safe solution is to subtract the delta from the limit, for example
-	// <code>position &lt; end - 3</code>. Since all indices are always positive, and the integer domain
-	// has one more negative value than positive values, this can never cause an underflow.
-	// --------------------------------------------------------------------------------------------
-
-
+	//------------------------------------------------------------------------
+	// Notes on the implementation: We try to collapse as many checks as
+	// possible. We need to obey the following rules to make this safe
+	// against segfaults:
+	// 
+	//  - Grab mutable fields onto the stack before checking and using. This
+	//    guards us against concurrent modifications which invalidate the
+	//    pointers
+	//  - Use subtrations for range checks, as they are tolerant 
+	//------------------------------------------------------------------------
+	
 	/**
 	 * Reads the byte at the given position.
-	 * 
+	 *
 	 * @param index The position from which the byte will be read
 	 * @return The byte at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger or equal to the size of
 	 *                                   the memory segment.
 	 */
-	public final byte get(int index) {
-		return this.memory[index];
-	}
+	public abstract byte get(int index);
 
 	/**
 	 * Writes the given byte into this buffer at the given position.
-	 * 
+	 *
 	 * @param index The index at which the byte will be written.
 	 * @param b The byte value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger or equal to the size of
 	 *                                   the memory segment.
 	 */
-	public final void put(int index, byte b) {
-		this.memory[index] = b;
-	}
+	public abstract void put(int index, byte b);
 
 	/**
 	 * Bulk get method. Copies dst.length memory from the specified position to
 	 * the destination memory.
-	 * 
+	 *
 	 * @param index The position at which the first byte will be read.
 	 * @param dst The memory into which the memory will be copied.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or too large that the data between the 
 	 *                                   index and the memory segment end is not enough to fill the destination array.
 	 */
-	public final void get(int index, byte[] dst) {
-		get(index, dst, 0, dst.length);
-	}
+	public abstract void get(int index, byte[] dst);
 
 	/**
 	 * Bulk put method. Copies src.length memory from the source memory into the
 	 * memory segment beginning at the specified position.
-	 * 
+	 *
 	 * @param index The index in the memory segment array, where the data is put.
 	 * @param src The source array to copy the data from.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or too large such that the array 
 	 *                                   size exceed the amount of memory between the index and the memory
 	 *                                   segment's end. 
 	 */
-	public final void put(int index, byte[] src) {
-		put(index, src, 0, src.length);
-	}
+	public abstract void put(int index, byte[] src);
 
 	/**
 	 * Bulk get method. Copies length memory from the specified position to the
 	 * destination memory, beginning at the given offset
-	 * 
+	 *
 	 * @param index The position at which the first byte will be read.
 	 * @param dst The memory into which the memory will be copied.
 	 * @param offset The copying offset in the destination memory.
 	 * @param length The number of bytes to be copied.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or too large that the requested number of 
 	 *                                   bytes exceed the amount of memory between the index and the memory
 	 *                                   segment's end.
 	 */
-	public final void get(int index, byte[] dst, int offset, int length) {
-		// system arraycopy does the boundary checks anyways, no need to check extra
-		System.arraycopy(this.memory, index, dst, offset, length);
-	}
+	public abstract void get(int index, byte[] dst, int offset, int length);
 
 	/**
 	 * Bulk put method. Copies length memory starting at position offset from
 	 * the source memory into the memory segment starting at the specified
 	 * index.
-	 * 
+	 *
 	 * @param index The position in the memory segment array, where the data is put.
 	 * @param src The source array to copy the data from.
 	 * @param offset The offset in the source array where the copying is started.
 	 * @param length The number of bytes to copy.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or too large such that the array 
 	 *                                   portion to copy exceed the amount of memory between the index and the memory
 	 *                                   segment's end.
 	 */
-	public final void put(int index, byte[] src, int offset, int length) {
-		// system arraycopy does the boundary checks anyways, no need to check extra
-		System.arraycopy(src, offset, this.memory, index, length);
-	}
+	public abstract void put(int index, byte[] src, int offset, int length);
 
 	/**
 	 * Reads one byte at the given position and returns its boolean
 	 * representation.
-	 * 
+	 *
 	 * @param index The position from which the memory will be read.
 	 * @return The boolean value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 1.
 	 */
-	public final boolean getBoolean(int index) {
-		return this.memory[index] != 0;
-	}
+	public abstract boolean getBoolean(int index);
 
 	/**
 	 * Writes one byte containing the byte value into this buffer at the given
 	 * position.
-	 * 
+	 *
 	 * @param index The position at which the memory will be written.
 	 * @param value The char value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 1.
 	 */
-	public final void putBoolean(int index, boolean value) {
-		this.memory[index] = (byte) (value ? 1 : 0);
-	}
+	public abstract void putBoolean(int index, boolean value);
 
 	/**
-	 * Reads two memory at the given position, composing them into a char value
-	 * according to the current byte order.
-	 * 
+	 * Reads a char value from the given position, in the system's native byte order.
+	 *
 	 * @param index The position from which the memory will be read.
 	 * @return The char value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 2.
 	 */
+	@SuppressWarnings("restriction")
 	public final char getChar(int index) {
-		return (char) ( ((this.memory[index    ] & 0xff) << 8) | 
-						(this.memory[index + 1] & 0xff) );
+		final long pos = address + index;
+		if (index >= 0 && pos <= addressLimit - 2) {
+			return UNSAFE.getChar(heapMemory, pos);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("This segment has been freed.");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
+		}
 	}
 
 	/**
-	 * Writes two memory containing the given char value, in the current byte
-	 * order, into this buffer at the given position.
-	 * 
+	 * Reads an character value (16 bit, 2 bytes) from the given position, in little-endian byte order.
+	 * This method's speed depends on the system's native byte order, and it
+	 * is possibly slower than {@link #getChar(int)}. For most cases (such as 
+	 * transient storage in memory or serialization for I/O and network),
+	 * it suffices to know that the byte order in which the value is written is the same as the
+	 * one in which it is read, and {@link #getChar(int)} is the preferable choice.
+	 *
+	 * @param index The position from which the value will be read.
+	 * @return The character value at the given position.
+	 *
+	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment size minus 2.
+	 */
+	public final char getCharLittleEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return getChar(index);
+		} else {
+			return Character.reverseBytes(getChar(index));
+		}
+	}
+
+	/**
+	 * Reads an character value (16 bit, 2 bytes) from the given position, in big-endian byte order.
+	 * This method's speed depends on the system's native byte order, and it
+	 * is possibly slower than {@link #getChar(int)}. For most cases (such as 
+	 * transient storage in memory or serialization for I/O and network),
+	 * it suffices to know that the byte order in which the value is written is the same as the
+	 * one in which it is read, and {@link #getChar(int)} is the preferable choice.
+	 *
+	 * @param index The position from which the value will be read.
+	 * @return The character value at the given position.
+	 *
+	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment size minus 2.
+	 */
+	public final char getCharBigEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return Character.reverseBytes(getChar(index));
+		} else {
+			return getChar(index);
+		}
+	}
+
+	/**
+	 * Writes a char value to teh given position, in the system's native byte order.
+	 *
 	 * @param index The position at which the memory will be written.
 	 * @param value The char value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 2.
 	 */
+	@SuppressWarnings("restriction")
 	public final void putChar(int index, char value) {
-		this.memory[index    ] = (byte) (value >> 8);
-		this.memory[index + 1] = (byte) value;
+		final long pos = address + index;
+		if (index >= 0 && pos <= addressLimit - 2) {
+			UNSAFE.putChar(heapMemory, pos, value);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	/**
+	 * Writes the given character (16 bit, 2 bytes) to the given position in little-endian
+	 * byte order. This method's speed depends on the system's native byte order, and it
+	 * is possibly slower than {@link #putChar(int, char)}. For most cases (such as 
+	 * transient storage in memory or serialization for I/O and network),
+	 * it suffices to know that the byte order in which the value is written is the same as the
+	 * one in which it is read, and {@link #putChar(int, char)} is the preferable choice.
+	 *
+	 * @param index The position at which the value will be written.
+	 * @param value The short value to be written.
+	 *
+	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment size minus 2.
+	 */
+	public final void putCharLittleEndian(int index, char value) {
+		if (LITTLE_ENDIAN) {
+			putChar(index, value);
+		} else {
+			putChar(index, Character.reverseBytes(value));
+		}
+	}
+
+	/**
+	 * Writes the given character (16 bit, 2 bytes) to the given position in big-endian
+	 * byte order. This method's speed depends on the system's native byte order, and it
+	 * is possibly slower than {@link #putChar(int, char)}. For most cases (such as 
+	 * transient storage in memory or serialization for I/O and network),
+	 * it suffices to know that the byte order in which the value is written is the same as the
+	 * one in which it is read, and {@link #putChar(int, char)} is the preferable choice.
+	 *
+	 * @param index The position at which the value will be written.
+	 * @param value The short value to be written.
+	 *
+	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment size minus 2.
+	 */
+	public final void putCharBigEndian(int index, char value) {
+		if (LITTLE_ENDIAN) {
+			putChar(index, Character.reverseBytes(value));
+		} else {
+			putChar(index, value);
+		}
 	}
 
 	/**
 	 * Reads two memory at the given position, composing them into a short value
 	 * according to the current byte order.
-	 * 
+	 *
 	 * @param index The position from which the memory will be read.
 	 * @return The short value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 2.
 	 */
 	public final short getShort(int index) {
-		return (short) (
-				((this.memory[index    ] & 0xff) << 8) |
-				((this.memory[index + 1] & 0xff)) );
+		final long pos = address + index;
+		if (index >= 0 && pos <= addressLimit - 2) {
+			return UNSAFE.getShort(heapMemory, pos);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	/**
+	 * Reads an short integer value (16 bit, 2 bytes) from the given position, in little-endian byte order.
+	 * This method's speed depends on the system's native byte order, and it
+	 * is possibly slower than {@link #getShort(int)}. For most cases (such as 
+	 * transient storage in memory or serialization for I/O and network),
+	 * it suffices to know that the byte order in which the value is written is the same as the
+	 * one in which it is read, and {@link #getShort(int)} is the preferable choice.
+	 *
+	 * @param index The position from which the value will be read.
+	 * @return The short value at the given position.
+	 *
+	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment size minus 2.
+	 */
+	public final short getShortLittleEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return getShort(index);
+		} else {
+			return Short.reverseBytes(getShort(index));
+		}
+	}
+
+	/**
+	 * Reads an short integer value (16 bit, 2 bytes) from the given position, in big-endian byte order.
+	 * This method's speed depends on the system's native byte order, and it
+	 * is possibly slower than {@link #getShort(int)}. For most cases (such as 
+	 * transient storage in memory or serialization for I/O and network),
+	 * it suffices to know that the byte order in which the value is written is the same as the
+	 * one in which it is read, and {@link #getShort(int)} is the preferable choice.
+	 *
+	 * @param index The position from which the value will be read.
+	 * @return The short value at the given position.
+	 *
+	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment size minus 2.
+	 */
+	public final short getShortBigEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return Short.reverseBytes(getShort(index));
+		} else {
+			return getShort(index);
+		}
 	}
 
 	/**
 	 * Writes the given short value into this buffer at the given position, using
 	 * the native byte order of the system.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The short value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 2.
 	 */
 	public final void putShort(int index, short value) {
-		this.memory[index    ] = (byte) (value >> 8);
-		this.memory[index + 1] = (byte) value;
+		final long pos = address + index;
+		if (index >= 0 && pos <= addressLimit - 2) {
+			UNSAFE.putShort(heapMemory, pos, value);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
+		}
 	}
-	
+
+	/**
+	 * Writes the given short integer value (16 bit, 2 bytes) to the given position in little-endian
+	 * byte order. This method's speed depends on the system's native byte order, and it
+	 * is possibly slower than {@link #putShort(int, short)}. For most cases (such as 
+	 * transient storage in memory or serialization for I/O and network),
+	 * it suffices to know that the byte order in which the value is written is the same as the
+	 * one in which it is read, and {@link #putShort(int, short)} is the preferable choice.
+	 *
+	 * @param index The position at which the value will be written.
+	 * @param value The short value to be written.
+	 *
+	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment size minus 2.
+	 */
+	public final void putShortLittleEndian(int index, short value) {
+		if (LITTLE_ENDIAN) {
+			putShort(index, value);
+		} else {
+			putShort(index, Short.reverseBytes(value));
+		}
+	}
+
+	/**
+	 * Writes the given short integer value (16 bit, 2 bytes) to the given position in big-endian
+	 * byte order. This method's speed depends on the system's native byte order, and it
+	 * is possibly slower than {@link #putShort(int, short)}. For most cases (such as 
+	 * transient storage in memory or serialization for I/O and network),
+	 * it suffices to know that the byte order in which the value is written is the same as the
+	 * one in which it is read, and {@link #putShort(int, short)} is the preferable choice.
+	 *
+	 * @param index The position at which the value will be written.
+	 * @param value The short value to be written.
+	 *
+	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment size minus 2.
+	 */
+	public final void putShortBigEndian(int index, short value) {
+		if (LITTLE_ENDIAN) {
+			putShort(index, Short.reverseBytes(value));
+		} else {
+			putShort(index, value);
+		}
+	}
+
 	/**
 	 * Reads an int value (32bit, 4 bytes) from the given position, in the system's native byte order.
 	 * This method offers the best speed for integer reading and should be used
@@ -353,37 +618,38 @@ public class MemorySegment {
 	 * byte order in which the value is written is the same as the one in which it is read 
 	 * (such as transient storage in memory, or serialization for I/O and network), making this
 	 * method the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The int value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 4.
 	 */
-	@SuppressWarnings("restriction")
 	public final int getInt(int index) {
-		if (CHECKED) {
-			if (index >= 0 && index <= this.memory.length - 4) {
-				return UNSAFE.getInt(this.memory, BASE_OFFSET + index);
-			} else {
-				throw new IndexOutOfBoundsException();
-			}
-		} else {
-			return UNSAFE.getInt(this.memory, BASE_OFFSET + index);
+		final long pos = address + index;
+		if (index >= 0 && pos <= addressLimit - 4) {
+			return UNSAFE.getInt(heapMemory, pos);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
 		}
 	}
-	
+
 	/**
-	 * Reads an int value (32bit, 4 bytes) from the given position, in little endian byte order.
+	 * Reads an int value (32bit, 4 bytes) from the given position, in little-endian byte order.
 	 * This method's speed depends on the system's native byte order, and it
 	 * is possibly slower than {@link #getInt(int)}. For most cases (such as 
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #getInt(int)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The int value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 4.
 	 */
@@ -394,18 +660,18 @@ public class MemorySegment {
 			return Integer.reverseBytes(getInt(index));
 		}
 	}
-	
+
 	/**
-	 * Reads an int value (32bit, 4 bytes) from the given position, in big endian byte order.
+	 * Reads an int value (32bit, 4 bytes) from the given position, in big-endian byte order.
 	 * This method's speed depends on the system's native byte order, and it
 	 * is possibly slower than {@link #getInt(int)}. For most cases (such as 
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #getInt(int)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The int value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 4.
 	 */
@@ -424,26 +690,27 @@ public class MemorySegment {
 	 * byte order in which the value is written is the same as the one in which it is read 
 	 * (such as transient storage in memory, or serialization for I/O and network), making this
 	 * method the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The int value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 4.
 	 */
-	@SuppressWarnings("restriction")
 	public final void putInt(int index, int value) {
-		if (CHECKED) {
-			if (index >= 0 && index <= this.memory.length - 4) {
-				UNSAFE.putInt(this.memory, BASE_OFFSET + index, value);
-			} else {
-				throw new IndexOutOfBoundsException();
-			}
-		} else {
-			UNSAFE.putInt(this.memory, BASE_OFFSET + index, value);
+		final long pos = address + index;
+		if (index >= 0 && pos <= addressLimit - 4) {
+			UNSAFE.putInt(heapMemory, pos, value);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
 		}
 	}
-	
+
 	/**
 	 * Writes the given int value (32bit, 4 bytes) to the given position in little endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -451,10 +718,10 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #putInt(int, int)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The int value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 4.
 	 */
@@ -465,7 +732,7 @@ public class MemorySegment {
 			putInt(index, Integer.reverseBytes(value));
 		}
 	}
-	
+
 	/**
 	 * Writes the given int value (32bit, 4 bytes) to the given position in big endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -473,10 +740,10 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #putInt(int, int)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The int value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 4.
 	 */
@@ -487,7 +754,7 @@ public class MemorySegment {
 			putInt(index, value);
 		}
 	}
-	
+
 	/**
 	 * Reads a long value (64bit, 8 bytes) from the given position, in the system's native byte order.
 	 * This method offers the best speed for long integer reading and should be used
@@ -495,26 +762,27 @@ public class MemorySegment {
 	 * byte order in which the value is written is the same as the one in which it is read 
 	 * (such as transient storage in memory, or serialization for I/O and network), making this
 	 * method the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The long value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
-	@SuppressWarnings("restriction")
 	public final long getLong(int index) {
-		if (CHECKED) {
-			if (index >= 0 && index <= this.memory.length - 8) {
-				return UNSAFE.getLong(this.memory, BASE_OFFSET + index);
-			} else {
-				throw new IndexOutOfBoundsException();
-			}
-		} else {
-			return UNSAFE.getLong(this.memory, BASE_OFFSET + index);
+		final long pos = address + index;
+		if (index >= 0 && pos <= addressLimit - 8) {
+			return UNSAFE.getLong(heapMemory, pos);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
 		}
 	}
-	
+
 	/**
 	 * Reads a long integer value (64bit, 8 bytes) from the given position, in little endian byte order.
 	 * This method's speed depends on the system's native byte order, and it
@@ -522,10 +790,10 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #getLong(int)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The long value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
@@ -536,7 +804,7 @@ public class MemorySegment {
 			return Long.reverseBytes(getLong(index));
 		}
 	}
-	
+
 	/**
 	 * Reads a long integer value (64bit, 8 bytes) from the given position, in big endian byte order.
 	 * This method's speed depends on the system's native byte order, and it
@@ -544,10 +812,10 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #getLong(int)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The long value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
@@ -566,26 +834,27 @@ public class MemorySegment {
 	 * byte order in which the value is written is the same as the one in which it is read 
 	 * (such as transient storage in memory, or serialization for I/O and network), making this
 	 * method the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The long value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
-	@SuppressWarnings("restriction")
 	public final void putLong(int index, long value) {
-		if (CHECKED) {
-			if (index >= 0 && index <= this.memory.length - 8) {
-				UNSAFE.putLong(this.memory, BASE_OFFSET + index, value);
-			} else {
-				throw new IndexOutOfBoundsException();
-			}
-		} else {
-			UNSAFE.putLong(this.memory, BASE_OFFSET + index, value);
+		final long pos = address + index;
+		if (index >= 0 && pos <= addressLimit - 8) {
+			UNSAFE.putLong(heapMemory, pos, value);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
 		}
 	}
-	
+
 	/**
 	 * Writes the given long value (64bit, 8 bytes) to the given position in little endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -593,10 +862,10 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #putLong(int, long)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The long value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
@@ -607,7 +876,7 @@ public class MemorySegment {
 			putLong(index, Long.reverseBytes(value));
 		}
 	}
-	
+
 	/**
 	 * Writes the given long value (64bit, 8 bytes) to the given position in big endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -615,10 +884,10 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #putLong(int, long)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The long value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
@@ -629,7 +898,7 @@ public class MemorySegment {
 			putLong(index, value);
 		}
 	}
-	
+
 	/**
 	 * Reads a single-precision floating point value (32bit, 4 bytes) from the given position, in the system's
 	 * native byte order. This method offers the best speed for float reading and should be used
@@ -637,17 +906,17 @@ public class MemorySegment {
 	 * byte order in which the value is written is the same as the one in which it is read 
 	 * (such as transient storage in memory, or serialization for I/O and network), making this
 	 * method the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The float value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 4.
 	 */
 	public final float getFloat(int index) {
 		return Float.intBitsToFloat(getInt(index));
 	}
-	
+
 	/**
 	 * Reads a single-precision floating point value (32bit, 4 bytes) from the given position, in little endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -655,17 +924,17 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #getFloat(int)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The long value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
 	public final float getFloatLittleEndian(int index) {
 		return Float.intBitsToFloat(getIntLittleEndian(index));
 	}
-	
+
 	/**
 	 * Reads a single-precision floating point value (32bit, 4 bytes) from the given position, in big endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -673,10 +942,10 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #getFloat(int)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The long value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
@@ -691,17 +960,17 @@ public class MemorySegment {
 	 * byte order in which the value is written is the same as the one in which it is read 
 	 * (such as transient storage in memory, or serialization for I/O and network), making this
 	 * method the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The float value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 4.
 	 */
 	public final void putFloat(int index, float value) {
 		putInt(index, Float.floatToRawIntBits(value));
 	}
-	
+
 	/**
 	 * Writes the given single-precision float value (32bit, 4 bytes) to the given position in little endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -709,17 +978,17 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #putFloat(int, float)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The long value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
 	public final void putFloatLittleEndian(int index, float value) {
 		putIntLittleEndian(index, Float.floatToRawIntBits(value));
 	}
-	
+
 	/**
 	 * Writes the given single-precision float value (32bit, 4 bytes) to the given position in big endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -727,17 +996,17 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #putFloat(int, float)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The long value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
 	public final void putFloatBigEndian(int index, float value) {
 		putIntBigEndian(index, Float.floatToRawIntBits(value));
 	}
-	
+
 	/**
 	 * Reads a double-precision floating point value (64bit, 8 bytes) from the given position, in the system's
 	 * native byte order. This method offers the best speed for double reading and should be used
@@ -745,17 +1014,17 @@ public class MemorySegment {
 	 * byte order in which the value is written is the same as the one in which it is read 
 	 * (such as transient storage in memory, or serialization for I/O and network), making this
 	 * method the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The double value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
 	public final double getDouble(int index) {
 		return Double.longBitsToDouble(getLong(index));
 	}
-	
+
 	/**
 	 * Reads a double-precision floating point value (64bit, 8 bytes) from the given position, in little endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -763,17 +1032,17 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #getDouble(int)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The long value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
 	public final double getDoubleLittleEndian(int index) {
 		return Double.longBitsToDouble(getLongLittleEndian(index));
 	}
-	
+
 	/**
 	 * Reads a double-precision floating point value (64bit, 8 bytes) from the given position, in big endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -781,10 +1050,10 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #getDouble(int)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position from which the value will be read.
 	 * @return The long value at the given position.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
@@ -799,17 +1068,17 @@ public class MemorySegment {
 	 * byte order in which the value is written is the same as the one in which it is read 
 	 * (such as transient storage in memory, or serialization for I/O and network), making this
 	 * method the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the memory will be written.
 	 * @param value The double value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
 	public final void putDouble(int index, double value) {
 		putLong(index, Double.doubleToRawLongBits(value));
 	}
-	
+
 	/**
 	 * Writes the given double-precision floating-point value (64bit, 8 bytes) to the given position in little endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -817,17 +1086,17 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #putDouble(int, double)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The long value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
 	public final void putDoubleLittleEndian(int index, double value) {
 		putLongLittleEndian(index, Double.doubleToRawLongBits(value));
 	}
-	
+
 	/**
 	 * Writes the given double-precision floating-point value (64bit, 8 bytes) to the given position in big endian
 	 * byte order. This method's speed depends on the system's native byte order, and it
@@ -835,70 +1104,53 @@ public class MemorySegment {
 	 * transient storage in memory or serialization for I/O and network),
 	 * it suffices to know that the byte order in which the value is written is the same as the
 	 * one in which it is read, and {@link #putDouble(int, double)} is the preferable choice.
-	 * 
+	 *
 	 * @param index The position at which the value will be written.
 	 * @param value The long value to be written.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException Thrown, if the index is negative, or larger then the segment
 	 *                                   size minus 8.
 	 */
 	public final void putDoubleBigEndian(int index, double value) {
 		putLongBigEndian(index, Double.doubleToRawLongBits(value));
 	}
-	
+
 	// -------------------------------------------------------------------------
 	//                     Bulk Read and Write Methods
 	// -------------------------------------------------------------------------
-	
-	/**
-	 * Bulk get method. Copies length memory from the specified offset to the
-	 * provided <tt>DataOutput</tt>.
-	 * 
-	 * @param out The data output object to copy the data to.
-	 * @param offset The first byte to by copied.
-	 * @param length The number of bytes to copy.
-	 * 
-	 * @throws IOException Thrown, if the DataOutput encountered a problem upon writing.
-	 */
-	public final void get(DataOutput out, int offset, int length) throws IOException {
-		out.write(this.memory, offset, length);
-	}
+
+	public abstract void get(DataOutput out, int offset, int length) throws IOException;
 
 	/**
 	 * Bulk put method. Copies length memory from the given DataInput to the
 	 * memory starting at position offset.
-	 * 
+	 *
 	 * @param in The DataInput to get the data from.
 	 * @param offset The position in the memory segment to copy the chunk to.
 	 * @param length The number of bytes to get. 
-	 * 
+	 *
 	 * @throws IOException Thrown, if the DataInput encountered a problem upon reading,
 	 *                     such as an End-Of-File.
 	 */
-	public final void put(DataInput in, int offset, int length) throws IOException {
-		in.readFully(this.memory, offset, length);
-	}
-	
+	public abstract void put(DataInput in, int offset, int length) throws IOException;
+
 	/**
 	 * Bulk get method. Copies {@code numBytes} bytes from this memory segment, starting at position
 	 * {@code offset} to the target {@code ByteBuffer}. The bytes will be put into the target buffer
 	 * starting at the buffer's current position. If this method attempts to write more bytes than
 	 * the target byte buffer has remaining (with respect to {@link ByteBuffer#remaining()}),
 	 * this method will cause a {@link java.nio.BufferOverflowException}.
-	 * 
+	 *
 	 * @param offset The position where the bytes are started to be read from in this memory segment.
 	 * @param target The ByteBuffer to copy the bytes to.
 	 * @param numBytes The number of bytes to copy.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException If the offset is invalid, or this segment does not
 	 *           contain the given number of bytes (starting from offset), or the target byte buffer does
 	 *           not have enough space for the bytes.
 	 */
-	public final void get(int offset, ByteBuffer target, int numBytes) {
-		// ByteBuffer performs the boundy checks
-		target.put(this.memory, offset, numBytes);
-	}
-	
+	public abstract void get(int offset, ByteBuffer target, int numBytes);
+
 	/**
 	 * Bulk put method. Copies {@code numBytes} bytes from the given {@code ByteBuffer}, into
 	 * this memory segment. The bytes will be read from the target buffer
@@ -907,68 +1159,132 @@ public class MemorySegment {
 	 * If this method attempts to read more bytes than
 	 * the target byte buffer has remaining (with respect to {@link ByteBuffer#remaining()}),
 	 * this method will cause a {@link java.nio.BufferUnderflowException}.
-	 * 
+	 *
 	 * @param offset The position where the bytes are started to be written to in this memory segment.
 	 * @param source The ByteBuffer to copy the bytes from.
 	 * @param numBytes The number of bytes to copy.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException If the offset is invalid, or the source buffer does not
 	 *           contain the given number of bytes, or this segment does
 	 *           not have enough space for the bytes (counting from offset).
 	 */
-	public final void put(int offset, ByteBuffer source, int numBytes) {
-		// ByteBuffer performs the boundy checks
-		source.get(this.memory, offset, numBytes);
-	}
-	
+	public abstract void put(int offset, ByteBuffer source, int numBytes);
+
 	/**
 	 * Bulk copy method. Copies {@code numBytes} bytes from this memory segment, starting at position
 	 * {@code offset} to the target memory segment. The bytes will be put into the target segment
 	 * starting at position {@code targetOffset}.
-	 * 
+	 *
 	 * @param offset The position where the bytes are started to be read from in this memory segment.
 	 * @param target The memory segment to copy the bytes to.
 	 * @param targetOffset The position in the target memory segment to copy the chunk to.
 	 * @param numBytes The number of bytes to copy.
-	 * 
+	 *
 	 * @throws IndexOutOfBoundsException If either of the offsets is invalid, or the source segment does not
 	 *           contain the given number of bytes (starting from offset), or the target segment does
 	 *           not have enough space for the bytes (counting from targetOffset).
 	 */
 	public final void copyTo(int offset, MemorySegment target, int targetOffset, int numBytes) {
-		// system arraycopy does the boundary checks anyways, no need to check extra
-		System.arraycopy(this.memory, offset, target.memory, targetOffset, numBytes);
+		final byte[] thisHeapRef = this.heapMemory;
+		final byte[] otherHeapRef = target.heapMemory;
+		final long thisPointer = this.address + offset;
+		final long otherPointer = target.address + targetOffset;
+
+		if ( (numBytes | offset | targetOffset) >= 0 &&
+				thisPointer <= this.addressLimit - numBytes && otherPointer <= target.addressLimit - numBytes)
+		{
+			UNSAFE.copyMemory(thisHeapRef, thisPointer, otherHeapRef, otherPointer, numBytes);
+		}
+		else if (this.address > this.addressLimit) {
+			throw new IllegalStateException("this memory segment has been freed.");
+		}
+		else if (target.address > target.addressLimit) {
+			throw new IllegalStateException("target memory segment has been freed.");
+		}
+		else {
+			throw new IndexOutOfBoundsException(
+					String.format("offset=%d, targetOffset=%d, numBytes=%d, address=%d, targetAddress=%d",
+					offset, targetOffset, numBytes, this.address, target.address));
+		}
 	}
-	
+
 	// -------------------------------------------------------------------------
 	//                      Comparisons & Swapping
 	// -------------------------------------------------------------------------
-	
-	public static final int compare(MemorySegment seg1, MemorySegment seg2, int offset1, int offset2, int len) {
-		final byte[] b1 = seg1.memory;
-		final byte[] b2 = seg2.memory;
-		
-		int val = 0;
-		for (int pos = 0; pos < len && (val = (b1[offset1 + pos] & 0xff) - (b2[offset2 + pos] & 0xff)) == 0; pos++);
-		return val;
-	}
-	
-	public static final void swapBytes(MemorySegment seg1, MemorySegment seg2, byte[] tempBuffer, int offset1, int offset2, int len) {
-		// system arraycopy does the boundary checks anyways, no need to check extra
-		System.arraycopy(seg1.memory, offset1, tempBuffer, 0, len);
-		System.arraycopy(seg2.memory, offset2, seg1.memory, offset1, len);
-		System.arraycopy(tempBuffer, 0, seg2.memory, offset2, len);
+
+	/**
+	 * Compares two memory segment regions.
+	 *
+	 * @param seg2 Segment to compare this segment with
+	 * @param offset1 Offset of this segment to start comparing
+	 * @param offset2 Offset of seg2 to start comparing
+	 * @param len Length of the compared memory region
+	 *
+	 * @return 0 if equal, -1 if seg1 < seg2, 1 otherwise
+	 */
+	public final int compare(MemorySegment seg2, int offset1, int offset2, int len) {
+		while (len >= 8) {
+			long l1 = this.getLongBigEndian(offset1);
+			long l2 = seg2.getLongBigEndian(offset2);
+
+			if (l1 != l2) {
+				return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1;
+			}
+
+			offset1 += 8;
+			offset2 += 8;
+			len -= 8;
+		}
+		while (len > 0) {
+			int b1 = this.get(offset1) & 0xff;
+			int b2 = seg2.get(offset2) & 0xff;
+			int cmp = b1 - b2;
+			if (cmp != 0) {
+				return cmp;
+			}
+			offset1++;
+			offset2++;
+			len--;
+		}
+		return 0;
 	}
+
+	/**
+	 * Swaps bytes between two memory segments, using the given auxiliary buffer.
+	 *
+	 * @param tempBuffer The auxiliary buffer in which to put data during triangle swap.
+	 * @param seg2 Segment to swap bytes with
+	 * @param offset1 Offset of this segment to start swapping
+	 * @param offset2 Offset of seg2 to start swapping
+	 * @param len Length of the swapped memory region
+	 */
+	public final void swapBytes(byte[] tempBuffer, MemorySegment seg2, int offset1, int offset2, int len) {
+		if ( (offset1 | offset2 | len | (tempBuffer.length - len) ) >= 0) {
+			final long thisPos = this.address + offset1;
+			final long otherPos = seg2.address + offset2;
+			
+			if (thisPos <= this.addressLimit - len && otherPos <= seg2.addressLimit - len) {
+				// this -> temp buffer
+				UNSAFE.copyMemory(this.heapMemory, thisPos, tempBuffer, BYTE_ARRAY_BASE_OFFSET, len);
 	
-	// --------------------------------------------------------------------------------------------
-	//                     Utilities for native memory accesses and checks
-	// --------------------------------------------------------------------------------------------
-	
-	@SuppressWarnings("restriction")
-	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-	
-	@SuppressWarnings("restriction")
-	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+				// other -> this
+				UNSAFE.copyMemory(seg2.heapMemory, otherPos, this.heapMemory, thisPos, len);
 	
-	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+				// temp buffer -> other
+				UNSAFE.copyMemory(tempBuffer, BYTE_ARRAY_BASE_OFFSET, seg2.heapMemory, otherPos, len);
+				return;
+			}
+			else if (this.address > this.addressLimit) {
+				throw new IllegalStateException("this memory segment has been freed.");
+			}
+			else if (seg2.address > seg2.addressLimit) {
+				throw new IllegalStateException("other memory segment has been freed.");
+			}
+		}
+		
+		// index is in fact invalid
+		throw new IndexOutOfBoundsException(
+					String.format("offset1=%d, offset2=%d, len=%d, bufferSize=%d, address1=%d, address2=%d",
+							offset1, offset2, len, tempBuffer.length, this.address, seg2.address));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
new file mode 100644
index 0000000..0e4e469
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A factory for memory segments. The purpose of this factory is to make sure that all memory segments
+ * for heap data are of the same type. That way, the runtime does not mix the various specializations
+ * of the {@link org.apache.flink.core.memory.MemorySegment}. Not mixing them has shown to be beneficial
+ * to method specialization by the JIT and to overall performance.
+ * <p>
+ * Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment},
+ * if a request to create a segment comes before the initialization.
+ */
+public class MemorySegmentFactory {
+
+	/** The factory to use */
+	private static volatile Factory factory;
+	
+	/**
+	 * Creates a new memory segment that targets the given heap memory region.
+	 * This method should be used to turn short lived byte arrays into memory segments.
+	 *
+	 * @param buffer The heap memory region.
+	 * @return A new memory segment that targets the given heap memory region.
+	 */
+	public static MemorySegment wrap(byte[] buffer) {
+		ensureInitialized();
+		return factory.wrap(buffer);
+	}
+
+	/**
+	 * Allocates some unpooled memory and creates a new memory segment that represents
+	 * that memory.
+	 * <p>
+	 * This method is similar to {@link #allocateUnpooledSegment(int, Object)}, but the
+	 * memory segment will have null as the owner.
+	 *
+	 * @param size The size of the memory segment to allocate.
+	 * @return A new memory segment, backed by unpooled heap memory.
+	 */
+	public static MemorySegment allocateUnpooledSegment(int size) {
+		return allocateUnpooledSegment(size, null);
+	}
+
+	/**
+	 * Allocates some unpooled memory and creates a new memory segment that represents
+	 * that memory.
+	 * <p>
+	 * This method is similar to {@link #allocateUnpooledSegment(int)}, but additionally sets
+	 * the owner of the memory segment.
+	 * 
+	 * @param size The size of the memory segment to allocate.
+	 * @param owner The owner to associate with the memory segment.
+	 * @return A new memory segment, backed by unpooled heap memory.
+	 */
+	public static MemorySegment allocateUnpooledSegment(int size, Object owner) {
+		ensureInitialized();
+		return factory.allocateUnpooledSegment(size, owner);
+	}
+
+	/**
+	 * Creates a memory segment that wraps the given byte array.
+	 * <p>
+	 * This method is intended to be used for components which pool memory and create
+	 * memory segments around long-lived memory regions.
+	 *
+	 * 
+	 * @param memory The heap memory to be represented by the memory segment.
+	 * @param owner The owner to associate with the memory segment.
+	 * @return A new memory segment representing the given heap memory.
+	 */
+	public static MemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
+		ensureInitialized();
+		return factory.wrapPooledHeapMemory(memory, owner);
+	}
+
+	/**
+	 * Creates a memory segment that wraps the off-heap memory backing the given ByteBuffer.
+	 * Note that the ByteBuffer needs to be a <i>direct ByteBuffer</i>. 
+	 * <p>
+	 * This method is intended to be used for components which pool memory and create
+	 * memory segments around long-lived memory regions.
+	 *
+	 * @param memory The byte buffer with the off-heap memory to be represented by the memory segment.
+	 * @param owner The owner to associate with the memory segment.
+	 * @return A new memory segment representing the given off-heap memory.
+	 */
+	public static MemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner) {
+		ensureInitialized();
+		return factory.wrapPooledOffHeapMemory(memory, owner);
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Initializes this factory with the given concrete factory.
+	 * 
+	 * @param f The concrete factory to use.
+	 * @throws java.lang.IllegalStateException Thrown, if this factory has been initialized before.
+	 */
+	public static void initializeFactory(Factory f) {
+		if (f == null) {
+			throw new NullPointerException();
+		}
+	
+		synchronized (MemorySegmentFactory.class) {
+			if (factory == null) {
+				factory = f;
+			}
+			else {
+				throw new IllegalStateException("Factory has already been initialized");
+			}
+		}
+	}
+
+	/**
+	 * Checks whether this memory segment factory has been initialized (with a type to produce).
+	 * 
+	 * @return True, if the factory has been initialized, false otherwise.
+	 */
+	public static boolean isInitialized() {
+		return factory != null;
+	}
+
+	/**
+	 * Gets the factory. May return null, if the factory has not been initialized.
+	 * 
+	 * @return The factory, or null, if the factory has not been initialized.
+	 */
+	public static Factory getFactory() {
+		return factory;
+	}
+	
+	private static void ensureInitialized() {
+		if (factory == null) {
+			factory = HeapMemorySegment.FACTORY;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal factory
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * A concrete factory for memory segments.
+	 */
+	public static interface Factory {
+
+		/**
+		 * Creates a new memory segment that targets the given heap memory region.
+		 *
+		 * @param memory The heap memory region.
+		 * @return A new memory segment that targets the given heap memory region.
+		 */
+		MemorySegment wrap(byte[] memory);
+
+		/**
+		 * Allocates some unpooled memory and creates a new memory segment that represents
+		 * that memory.
+		 *
+		 * @param size The size of the memory segment to allocate.
+		 * @param owner The owner to associate with the memory segment.
+		 * @return A new memory segment, backed by unpooled heap memory.
+		 */
+		MemorySegment allocateUnpooledSegment(int size, Object owner);
+
+		/**
+		 * Creates a memory segment that wraps the given byte array.
+		 * <p>
+		 * This method is intended to be used for components which pool memory and create
+		 * memory segments around long-lived memory regions.
+		 *
+		 *
+		 * @param memory The heap memory to be represented by the memory segment.
+		 * @param owner The owner to associate with the memory segment.
+		 * @return A new memory segment representing the given heap memory.
+		 */
+		MemorySegment wrapPooledHeapMemory(byte[] memory, Object owner);
+
+		/**
+		 * Creates a memory segment that wraps the off-heap memory backing the given ByteBuffer.
+		 * Note that the ByteBuffer needs to be a <i>direct ByteBuffer</i>. 
+		 * <p>
+		 * This method is intended to be used for components which pool memory and create
+		 * memory segments around long-lived memory regions.
+		 *
+		 * @param memory The byte buffer with the off-heap memory to be represented by the memory segment.
+		 * @param owner The owner to associate with the memory segment.
+		 * @return A new memory segment representing the given off-heap memory.
+		 */
+		MemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java
new file mode 100644
index 0000000..5b2b5d3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+/**
+ * The class of memory, such as heap or off-heap.
+ */
+public enum MemoryType {
+
+	/**
+	 * Denotes memory that is part of the Java heap.
+	 */
+	HEAP,
+
+	/**
+	 * Denotes memory that is outside the Java heap (but still part of tha Java process).
+	 */
+	OFF_HEAP
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
index c1f626f..20b37c9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
@@ -22,22 +22,17 @@ import java.lang.reflect.Field;
 import java.nio.ByteOrder;
 
 /**
- * Utility class for native (unsafe) memory accesses.
+ * Utility class for memory operations.
  */
 public class MemoryUtils {
 	
-	/**
-	 * The "unsafe", which can be used to perform native memory accesses.
-	 */
+	/** The "unsafe", which can be used to perform native memory accesses. */
 	@SuppressWarnings("restriction")
 	public static final sun.misc.Unsafe UNSAFE = getUnsafe();
 	
-	/**
-	 * The native byte order of the platform on which the system currently runs.
-	 */
-	public static final ByteOrder NATIVE_BYTE_ORDER = getByteOrder();
-	
-	
+	/** The native byte order of the platform on which the system currently runs. */
+	public static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder();
+
 	@SuppressWarnings("restriction")
 	private static sun.misc.Unsafe getUnsafe() {
 		try {
@@ -45,21 +40,18 @@ public class MemoryUtils {
 			unsafeField.setAccessible(true);
 			return (sun.misc.Unsafe) unsafeField.get(null);
 		} catch (SecurityException e) {
-			throw new RuntimeException("Could not access the unsafe handle.", e);
+			throw new RuntimeException("Could not access the sun.misc.Unsafe handle, permission denied by security manager.", e);
 		} catch (NoSuchFieldException e) {
-			throw new RuntimeException("The static unsafe handle field was not be found.");
+			throw new RuntimeException("The static handle field in sun.misc.Unsafe was not found.");
 		} catch (IllegalArgumentException e) {
-			throw new RuntimeException("Bug: Illegal argument reflection access for static field.");
+			throw new RuntimeException("Bug: Illegal argument reflection access for static field.", e);
 		} catch (IllegalAccessException e) {
-			throw new RuntimeException("Access to the unsafe handle is forbidden by the runtime.", e);
+			throw new RuntimeException("Access to sun.misc.Unsafe is forbidden by the runtime.", e);
+		} catch (Throwable t) {
+			throw new RuntimeException("Unclassified error while trying to access the sun.misc.Unsafe handle.", t);
 		}
 	}
-	
-	@SuppressWarnings("restriction")
-	private static ByteOrder getByteOrder() {
-		return ByteOrder.nativeOrder();
-	}
-	
-	
+
+	/** Should not be instantiated */
 	private MemoryUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
index 782c402..a8ace92 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
@@ -29,7 +29,9 @@ import static org.junit.Assert.*;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -238,7 +240,7 @@ public abstract class ComparatorTestBase<T> extends TestLogger {
 	
 	// Help Function for setting up a memory segment and normalize the keys of the data array in it
 	public MemorySegment setupNormalizedKeysMemSegment(T[] data, int normKeyLen, TypeComparator<T> comparator) {
-		MemorySegment memSeg = new MemorySegment(new byte[2048]);
+		MemorySegment memSeg = MemorySegmentFactory.allocateUnpooledSegment(2048);
 
 		// Setup normalized Keys in the memory segment
 		int offset = 0;
@@ -294,7 +296,7 @@ public abstract class ComparatorTestBase<T> extends TestLogger {
 			MemorySegment memSeg2 = setupNormalizedKeysMemSegment(data, normKeyLen, comparator);
 
 			for (int i = 0; i < data.length; i++) {
-				assertTrue(MemorySegment.compare(memSeg1, memSeg2, i * normKeyLen, i * normKeyLen, normKeyLen) == 0);
+				assertTrue(memSeg1.compare(memSeg2, i * normKeyLen, i * normKeyLen, normKeyLen) == 0);
 			}
 		} catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -343,14 +345,14 @@ public abstract class ComparatorTestBase<T> extends TestLogger {
 				for (int h = l + 1; h < data.length; h++) {
 					int cmp;
 					if (greater) {
-						cmp = MemorySegment.compare(memSegLow, memSegHigh, l * normKeyLen, h * normKeyLen, normKeyLen);
+						cmp = memSegLow.compare(memSegHigh, l * normKeyLen, h * normKeyLen, normKeyLen);
 						if (fullyDetermines) {
 							assertTrue(cmp < 0);
 						} else {
 							assertTrue(cmp <= 0);
 						}
 					} else {
-						cmp = MemorySegment.compare(memSegHigh, memSegLow, h * normKeyLen, l * normKeyLen, normKeyLen);
+						cmp = memSegHigh.compare(memSegLow, h * normKeyLen, l * normKeyLen, normKeyLen);
 						if (fullyDetermines) {
 							assertTrue(cmp > 0);
 						} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
new file mode 100644
index 0000000..724a366
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class CrossSegmentTypeTest {
+
+	private final int pageSize = 32*1024;
+	
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testCompareBytesMixedSegments() {
+		try {
+			MemorySegment[] segs1 = {
+					new HeapMemorySegment(new byte[pageSize]),
+					new HybridMemorySegment(new byte[pageSize]),
+					new HybridMemorySegment(ByteBuffer.allocateDirect(pageSize))
+			};
+
+			MemorySegment[] segs2 = {
+					new HeapMemorySegment(new byte[pageSize]),
+					new HybridMemorySegment(new byte[pageSize]),
+					new HybridMemorySegment(ByteBuffer.allocateDirect(pageSize))
+			};
+		
+			Random rnd = new Random();
+			
+			for (MemorySegment seg1 : segs1) {
+				for (MemorySegment seg2 : segs2) {
+					testCompare(seg1, seg2, rnd);
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private void testCompare(MemorySegment seg1, MemorySegment seg2, Random random) {
+		assertEquals(pageSize, seg1.size());
+		assertEquals(pageSize, seg2.size());
+		
+		final byte[] bytes1 = new byte[pageSize];
+		final byte[] bytes2 = new byte[pageSize];
+
+		final int stride = pageSize / 255;
+		final int shift = 16666;
+
+		for (int i = 0; i < pageSize; i++) {
+			byte val = (byte) ((i / stride) & 0xff);
+			bytes1[i] = val;
+
+			if (i + shift < bytes2.length) {
+				bytes2[i + shift] = val;
+			}
+		}
+		
+		seg1.put(0, bytes1);
+		seg2.put(0, bytes2);
+
+		for (int i = 0; i < 1000; i++) {
+			int pos1 = random.nextInt(bytes1.length);
+			int pos2 = random.nextInt(bytes2.length);
+
+			int len = Math.min(Math.min(bytes1.length - pos1, bytes2.length - pos2),
+					random.nextInt(pageSize / 50 ));
+
+			int cmp = seg1.compare(seg2, pos1, pos2, len);
+
+			if (pos1 < pos2 - shift) {
+				assertTrue(cmp <= 0);
+			}
+			else {
+				assertTrue(cmp >= 0);
+			}
+		}
+	}
+
+
+	@Test
+	public void testSwapBytesMixedSegments() {
+		try {
+			final int HALF_SIZE = pageSize / 2;
+			
+			MemorySegment[] segs1 = {
+					new HeapMemorySegment(new byte[pageSize]),
+					new HybridMemorySegment(new byte[pageSize]),
+					new HybridMemorySegment(ByteBuffer.allocateDirect(pageSize))
+			};
+
+			MemorySegment[] segs2 = {
+					new HeapMemorySegment(new byte[HALF_SIZE]),
+					new HybridMemorySegment(new byte[HALF_SIZE]),
+					new HybridMemorySegment(ByteBuffer.allocateDirect(HALF_SIZE))
+			};
+
+			Random rnd = new Random();
+
+			for (MemorySegment seg1 : segs1) {
+				for (MemorySegment seg2 : segs2) {
+					testSwap(seg1, seg2, rnd, HALF_SIZE);
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private void testSwap(MemorySegment seg1, MemorySegment seg2, Random random, int smallerSize) {
+		assertEquals(pageSize, seg1.size());
+		assertEquals(smallerSize, seg2.size());
+
+		final byte[] bytes1 = new byte[pageSize];
+		final byte[] bytes2 = new byte[smallerSize];
+
+		Arrays.fill(bytes2, (byte) 1);
+		
+		seg1.put(0, bytes1);
+		seg2.put(0, bytes2);
+
+		// wap the second half of the first segment with the second segment
+
+		int pos = 0;
+		while (pos < smallerSize) {
+			int len = random.nextInt(pageSize / 40);
+			len = Math.min(len, smallerSize - pos);
+			seg1.swapBytes(new byte[len], seg2, pos + smallerSize, pos, len);
+			pos += len;
+		}
+
+		// the second segment should now be all zeros, the first segment should have one in its second half
+
+		for (int i = 0; i < smallerSize; i++) {
+			assertEquals((byte) 0, seg1.get(i));
+			assertEquals((byte) 0, seg2.get(i));
+			assertEquals((byte) 1, seg1.get(i + smallerSize));
+		}
+	}
+
+	@Test
+	public void testCopyMixedSegments() {
+		try {
+			MemorySegment[] segs1 = {
+					new HeapMemorySegment(new byte[pageSize]),
+					new HybridMemorySegment(new byte[pageSize]),
+					new HybridMemorySegment(ByteBuffer.allocateDirect(pageSize))
+			};
+
+			MemorySegment[] segs2 = {
+					new HeapMemorySegment(new byte[pageSize]),
+					new HybridMemorySegment(new byte[pageSize]),
+					new HybridMemorySegment(ByteBuffer.allocateDirect(pageSize))
+			};
+
+			Random rnd = new Random();
+
+			for (MemorySegment seg1 : segs1) {
+				for (MemorySegment seg2 : segs2) {
+					testCopy(seg1, seg2, rnd);
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private void testCopy(MemorySegment seg1, MemorySegment seg2, Random random) {
+		assertEquals(pageSize, seg1.size());
+		assertEquals(pageSize, seg2.size());
+
+		byte[] expected = new byte[pageSize];
+		byte[] actual = new byte[pageSize];
+		
+		// zero out the memory
+		seg1.put(0, expected);
+		seg2.put(0, expected);
+		
+		for (int i = 0; i < 40; i++) {
+			int numBytes = random.nextInt(pageSize / 20);
+			byte[] bytes = new byte[numBytes];
+			random.nextBytes(bytes);
+			
+			int thisPos = random.nextInt(pageSize - numBytes);
+			int otherPos = random.nextInt(pageSize - numBytes);
+			
+			// track what we expect
+			System.arraycopy(bytes, 0, expected, otherPos, numBytes);
+			
+			seg1.put(thisPos, bytes);
+			seg1.copyTo(thisPos, seg2, otherPos, numBytes);
+		}
+		
+		seg2.get(0, actual);
+		assertArrayEquals(expected, actual);
+		
+		// test out of bound conditions
+
+		final int[] validOffsets = { 0, 1, pageSize / 10 * 9 };
+		final int[] invalidOffsets = { -1, pageSize + 1, -pageSize, Integer.MAX_VALUE, Integer.MIN_VALUE };
+
+		final int[] validLengths = { 0, 1, pageSize / 10, pageSize };
+		final int[] invalidLengths = { -1, -pageSize, pageSize + 1, Integer.MAX_VALUE, Integer.MIN_VALUE };
+
+		for (int off1 : validOffsets) {
+			for (int off2 : validOffsets) {
+				for (int len : invalidLengths) {
+					try {
+						seg1.copyTo(off1, seg2, off2, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg1.copyTo(off2, seg2, off1, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+					
+					try {
+						seg2.copyTo(off1, seg1, off2, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg2.copyTo(off2, seg1, off1, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+				}
+			}
+		}
+
+		for (int off1 : validOffsets) {
+			for (int off2 : invalidOffsets) {
+				for (int len : validLengths) {
+					try {
+						seg1.copyTo(off1, seg2, off2, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg1.copyTo(off2, seg2, off1, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg2.copyTo(off1, seg1, off2, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg2.copyTo(off2, seg1, off1, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+				}
+			}
+		}
+
+		for (int off1 : invalidOffsets) {
+			for (int off2 : validOffsets) {
+				for (int len : validLengths) {
+					try {
+						seg1.copyTo(off1, seg2, off2, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg1.copyTo(off2, seg2, off1, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg2.copyTo(off1, seg1, off2, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg2.copyTo(off2, seg1, off1, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+				}
+			}
+		}
+
+		for (int off1 : invalidOffsets) {
+			for (int off2 : invalidOffsets) {
+				for (int len : validLengths) {
+					try {
+						seg1.copyTo(off1, seg2, off2, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg1.copyTo(off2, seg2, off1, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg2.copyTo(off1, seg1, off2, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+
+					try {
+						seg2.copyTo(off2, seg1, off1, len);
+						fail("should fail with an IndexOutOfBoundsException");
+					}
+					catch (IndexOutOfBoundsException ignored) {}
+				}
+			}
+		}
+	}
+}