You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/08 20:59:01 UTC

[11/15] flink git commit: [FLINK-1320] [core] Add an off-heap variant of the managed memory

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/MemorySegmentSpeedBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/MemorySegmentSpeedBenchmark.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/MemorySegmentSpeedBenchmark.java
new file mode 100644
index 0000000..454c821
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/MemorySegmentSpeedBenchmark.java
@@ -0,0 +1,1633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory.benchmarks;
+
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+@SuppressWarnings("ConstantConditions")
+public class MemorySegmentSpeedBenchmark {
+	
+	private static final long LONG_VALUE = 0x1234567890abcdefl;
+	
+	private static final boolean TEST_CORE_ON_HEAP = true;
+	private static final boolean TEST_CORE_OFF_HEAP = false;
+	
+	// we keep this to make sure the JIT does not eliminate certain loops
+	public static long sideEffect = 0L;
+	
+	
+	public static void main(String[] args) {
+		
+		final int SMALL_SEGMENT_SIZE = 32 * 1024;
+		final int LARGE_SEGMENT_SIZE = 1024 * 1024 * 1024;
+		
+		final int SMALL_SEGMENTS_ROUNDS = 100000;
+		final int LARGE_SEGMENT_ROUNDS = 10;
+		
+		final byte[] largeSegment = new byte[LARGE_SEGMENT_SIZE];
+		final byte[] smallSegment = new byte[SMALL_SEGMENT_SIZE];
+		
+		final ByteBuffer largeOffHeap = ByteBuffer.allocateDirect(LARGE_SEGMENT_SIZE);
+		final ByteBuffer smallOffHeap = ByteBuffer.allocateDirect(SMALL_SEGMENT_SIZE);
+
+		System.out.println("testing access of individual bytes");
+		
+		testPutBytes(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE, SMALL_SEGMENTS_ROUNDS);
+		testGetBytes(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE, SMALL_SEGMENTS_ROUNDS);
+		testPutBytes(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE, LARGE_SEGMENT_ROUNDS);
+		testGetBytes(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE, LARGE_SEGMENT_ROUNDS);
+
+		System.out.println("testing access of byte arrays");
+
+		testPutByteArrays1024(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE / 1024, SMALL_SEGMENTS_ROUNDS);
+		testGetByteArrays1024(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE / 1024, SMALL_SEGMENTS_ROUNDS);
+		testPutByteArrays1024(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE / 1024, LARGE_SEGMENT_ROUNDS);
+		testGetByteArrays1024(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE / 1024, LARGE_SEGMENT_ROUNDS);
+		
+		System.out.println("testing access of longs");
+		
+		testPutLongs(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE / 8, SMALL_SEGMENTS_ROUNDS);
+		testGetLongs(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE / 8, SMALL_SEGMENTS_ROUNDS);
+		testPutLongs(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE / 8, LARGE_SEGMENT_ROUNDS);
+		testGetLongs(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE / 8, LARGE_SEGMENT_ROUNDS);
+
+//		System.out.println("testing access of big endian longs");
+//		
+//		testPutLongsBigEndian(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE / 8, SMALL_SEGMENTS_ROUNDS);
+//		testGetLongsBigEndian(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE / 8, SMALL_SEGMENTS_ROUNDS);
+//		testPutLongsBigEndian(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE / 8, LARGE_SEGMENT_ROUNDS);
+//		testGetLongsBigEndian(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE / 8, LARGE_SEGMENT_ROUNDS);
+//
+//		System.out.println("testing access of little endian longs");
+//		
+//		testPutLongsLittleEndian(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE / 8, SMALL_SEGMENTS_ROUNDS);
+//		testGetLongsLittleEndian(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE / 8, SMALL_SEGMENTS_ROUNDS);
+//		testPutLongsLittleEndian(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE / 8, LARGE_SEGMENT_ROUNDS);
+//		testGetLongsLittleEndian(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE / 8, LARGE_SEGMENT_ROUNDS);
+
+		System.out.println("testing access of ints");
+		
+		testPutInts(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE / 4, SMALL_SEGMENTS_ROUNDS);
+		testGetInts(smallSegment, smallOffHeap, SMALL_SEGMENT_SIZE / 4, SMALL_SEGMENTS_ROUNDS);
+		testPutInts(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE / 4, LARGE_SEGMENT_ROUNDS);
+		testGetInts(largeSegment, largeOffHeap, LARGE_SEGMENT_SIZE / 4, LARGE_SEGMENT_ROUNDS);
+
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//                                  BYTEs
+	// --------------------------------------------------------------------------------------------
+
+	private static void testPutBytes(final byte[] heapMemory, final ByteBuffer offHeapMemory,
+										final int numValues, final int rounds) {
+		
+		TestRunner pureHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHeapMemorySegment seg = new PureHeapMemorySegment(heapMemory);
+				return timePutBytesOnHeap(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(heapMemory);
+				return timePutBytesHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(offHeapMemory);
+				return timePutBytesHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HeapMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timePutBytesAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timePutBytesAbstract(seg, numValues, rounds);
+			}
+		};
+		
+		TestRunner coreHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(offHeapMemory, null);
+				return timePutBytesAbstract(seg, numValues, rounds);
+			}
+		};
+		
+		TestRunner[] tests = {
+				TEST_CORE_ON_HEAP ? coreHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridOffHeapRunner : null,
+				pureHeapRunner, pureHybridHeapRunner, pureHybridOffHeapRunner
+		};
+
+		long[] results = runTestsInRandomOrder(tests, new Random(), 5, true);
+
+		System.out.println(String.format(
+				"Writing %d x %d bytes to %d bytes segment: " +
+						"\n\theap=%,d msecs" +
+						"\n\thybrid-on-heap=%,d msecs" +
+						"\n\thybrid-off-heap=%,d msecs" +
+						"\n\tspecialized heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-off-heap=%,d msecs, ",
+				rounds, numValues, heapMemory.length,
+				(results[0] / 1000000), (results[1] / 1000000), (results[2] / 1000000),
+				(results[3] / 1000000), (results[4] / 1000000), (results[5] / 1000000)));
+	}
+
+	private static void testGetBytes(final byte[] heapMemory, final ByteBuffer offHeapMemory,
+										final int numValues, final int rounds) {
+
+		TestRunner pureHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHeapMemorySegment seg = new PureHeapMemorySegment(heapMemory);
+				return timeGetBytesOnHeap(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(heapMemory);
+				return timeGetBytesHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(offHeapMemory);
+				return timeGetBytesHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HeapMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timeGetBytesAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timeGetBytesAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(offHeapMemory, null);
+				return timeGetBytesAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner[] tests = {
+				TEST_CORE_ON_HEAP ? coreHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridOffHeapRunner : null,
+				pureHeapRunner, pureHybridHeapRunner, pureHybridOffHeapRunner
+		};
+
+		long[] results = runTestsInRandomOrder(tests, new Random(), 5, true);
+
+		System.out.println(String.format(
+				"Reading %d x %d bytes from %d bytes segment: " +
+						"\n\theap=%,d msecs" +
+						"\n\thybrid-on-heap=%,d msecs" +
+						"\n\thybrid-off-heap=%,d msecs" +
+						"\n\tspecialized heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-off-heap=%,d msecs, ",
+				rounds, numValues, heapMemory.length,
+				(results[0] / 1000000), (results[1] / 1000000), (results[2] / 1000000),
+				(results[3] / 1000000), (results[4] / 1000000), (results[5] / 1000000)));
+	}
+
+	private static long timePutBytesOnHeap(final PureHeapMemorySegment segment, final int num, final int rounds) {
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.put(offset, (byte) i);
+				offset++;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+
+	private static long timePutBytesOffHeap(final PureOffHeapMemorySegment segment, final int num, final int rounds) {
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.put(offset, (byte) i);
+				offset++;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+
+	private static long timePutBytesHybrid(final PureHybridMemorySegment segment, final int num, final int rounds) {
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.put(offset, (byte) i);
+				offset++;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+
+	private static long timePutBytesAbstract(final MemorySegment segment, final int num, final int rounds) {
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.put(offset, (byte) i);
+				offset++;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timeGetBytesOnHeap(final PureHeapMemorySegment segment, final int num, final int rounds) {
+		long l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.get(offset);
+				offset++;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+
+	private static long timeGetBytesOffHeap(final PureOffHeapMemorySegment segment, final int num, final int rounds) {
+		long l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.get(offset);
+				offset++;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+
+	private static long timeGetBytesHybrid(final PureHybridMemorySegment segment, final int num, final int rounds) {
+		long l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.get(offset);
+				offset++;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+
+
+	private static long timeGetBytesAbstract(final MemorySegment segment, final int num, final int rounds) {
+		long l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.get(offset);
+				offset++;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//                                  LONGs
+	// --------------------------------------------------------------------------------------------
+	
+	private static void testPutLongs(final byte[] heapMemory, final ByteBuffer offHeapMemory,
+										final int numValues, final int rounds) {
+
+		TestRunner pureHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHeapMemorySegment seg = new PureHeapMemorySegment(heapMemory);
+				return timePutLongsOnHeap(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(heapMemory);
+				return timePutLongsHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(offHeapMemory);
+				return timePutLongsHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HeapMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timePutLongsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timePutLongsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(offHeapMemory, null);
+				return timePutLongsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner[] tests = {
+				TEST_CORE_ON_HEAP ? coreHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridOffHeapRunner : null,
+				pureHeapRunner, pureHybridHeapRunner, pureHybridOffHeapRunner
+		};
+
+		long[] results = runTestsInRandomOrder(tests, new Random(), 5, true);
+
+		System.out.println(String.format(
+				"Writing %d x %d longs to %d bytes segment: " +
+						"\n\theap=%,d msecs" +
+						"\n\thybrid-on-heap=%,d msecs" +
+						"\n\thybrid-off-heap=%,d msecs" +
+						"\n\tspecialized heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-off-heap=%,d msecs, ",
+				rounds, numValues, heapMemory.length,
+				(results[0] / 1000000), (results[1] / 1000000), (results[2] / 1000000),
+				(results[3] / 1000000), (results[4] / 1000000), (results[5] / 1000000)));
+	}
+	
+	private static void testGetLongs(final byte[] heapMemory, final ByteBuffer offHeapMemory,
+										final int numValues, final int rounds) {
+
+		TestRunner pureHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHeapMemorySegment seg = new PureHeapMemorySegment(heapMemory);
+				return timeGetLongsOnHeap(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(heapMemory);
+				return timeGetLongsHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(offHeapMemory);
+				return timeGetLongsHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HeapMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timeGetLongsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timeGetLongsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(offHeapMemory, null);
+				return timeGetLongsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner[] tests = {
+				TEST_CORE_ON_HEAP ? coreHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridOffHeapRunner : null,
+				pureHeapRunner, pureHybridHeapRunner, pureHybridOffHeapRunner
+		};
+
+		long[] results = runTestsInRandomOrder(tests, new Random(), 5, true);
+
+		System.out.println(String.format(
+				"Reading %d x %d longs from %d bytes segment: " +
+						"\n\theap=%,d msecs" +
+						"\n\thybrid-on-heap=%,d msecs" +
+						"\n\thybrid-off-heap=%,d msecs" +
+						"\n\tspecialized heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-off-heap=%,d msecs, ",
+				rounds, numValues, heapMemory.length,
+				(results[0] / 1000000), (results[1] / 1000000), (results[2] / 1000000),
+				(results[3] / 1000000), (results[4] / 1000000), (results[5] / 1000000)));
+	}
+	
+	private static long timePutLongsOnHeap(final PureHeapMemorySegment segment, final int num, final int rounds) {
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.putLong(offset, LONG_VALUE);
+				offset += 8;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timePutLongsOffHeap(final PureOffHeapMemorySegment segment, final int num, final int rounds) {
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.putLong(offset, LONG_VALUE);
+				offset += 8;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timePutLongsHybrid(final PureHybridMemorySegment segment, final int num, final int rounds) {
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.putLong(offset, LONG_VALUE);
+				offset += 8;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+
+	private static long timePutLongsAbstract(final MemorySegment segment, final int num, final int rounds) {
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.putLong(offset, LONG_VALUE);
+				offset += 8;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timeGetLongsOnHeap(final PureHeapMemorySegment segment, final int num, final int rounds) {
+		long l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.getLong(offset);
+				offset += 8;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+	
+	private static long timeGetLongsOffHeap(final PureOffHeapMemorySegment segment, final int num, final int rounds) {
+		long l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.getLong(offset);
+				offset += 8;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+	
+	private static long timeGetLongsHybrid(final PureHybridMemorySegment segment, final int num, final int rounds) {
+		// checked segment
+		long l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.getLong(offset);
+				offset += 8;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+
+	private static long timeGetLongsAbstract(final MemorySegment segment, final int num, final int rounds) {
+		// checked segment
+		long l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.getLong(offset);
+				offset += 8;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                                  INTs
+	// --------------------------------------------------------------------------------------------
+	
+	private static void testPutInts(final byte[] heapMemory, final ByteBuffer offHeapMemory,
+									final int numValues, final int rounds) {
+
+		TestRunner pureHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHeapMemorySegment seg = new PureHeapMemorySegment(heapMemory);
+				return timePutIntsOnHeap(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(heapMemory);
+				return timePutIntsHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(offHeapMemory);
+				return timePutIntsHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HeapMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timePutIntsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timePutIntsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(offHeapMemory, null);
+				return timePutIntsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner[] tests = {
+				TEST_CORE_ON_HEAP ? coreHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridOffHeapRunner : null,
+				pureHeapRunner, pureHybridHeapRunner, pureHybridOffHeapRunner
+		};
+
+		long[] results = runTestsInRandomOrder(tests, new Random(), 5, true);
+
+		System.out.println(String.format(
+				"Writing %d x %d ints to %d bytes segment: " +
+						"\n\theap=%,d msecs" +
+						"\n\thybrid-on-heap=%,d msecs" +
+						"\n\thybrid-off-heap=%,d msecs" +
+						"\n\tspecialized heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-off-heap=%,d msecs, ",
+				rounds, numValues, heapMemory.length,
+				(results[0] / 1000000), (results[1] / 1000000), (results[2] / 1000000),
+				(results[3] / 1000000), (results[4] / 1000000), (results[5] / 1000000)));
+	}
+	
+	private static void testGetInts(final byte[] heapMemory, final ByteBuffer offHeapMemory,
+									final int numValues, final int rounds) {
+
+		TestRunner pureHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHeapMemorySegment seg = new PureHeapMemorySegment(heapMemory);
+				return timeGetIntsOnHeap(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(heapMemory);
+				return timeGetIntsHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(offHeapMemory);
+				return timeGetIntsHybrid(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HeapMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timeGetIntsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timeGetIntsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(offHeapMemory, null);
+				return timeGetIntsAbstract(seg, numValues, rounds);
+			}
+		};
+
+		TestRunner[] tests = {
+				TEST_CORE_ON_HEAP ? coreHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridOffHeapRunner : null,
+				pureHeapRunner, pureHybridHeapRunner, pureHybridOffHeapRunner
+		};
+
+		long[] results = runTestsInRandomOrder(tests, new Random(), 5, true);
+
+		System.out.println(String.format(
+				"Reading %d x %d ints from %d bytes segment: " +
+						"\n\theap=%,d msecs" +
+						"\n\thybrid-on-heap=%,d msecs" +
+						"\n\thybrid-off-heap=%,d msecs" +
+						"\n\tspecialized heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-off-heap=%,d msecs, ",
+				rounds, numValues, heapMemory.length,
+				(results[0] / 1000000), (results[1] / 1000000), (results[2] / 1000000),
+				(results[3] / 1000000), (results[4] / 1000000), (results[5] / 1000000)));
+	}
+	
+	private static long timePutIntsOnHeap(final PureHeapMemorySegment segment, final int num, final int rounds) {
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.putInt(offset, i);
+				offset += 4;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timePutIntsOffHeap(final PureOffHeapMemorySegment segment, final int num, final int rounds) {
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.putInt(offset, i);
+				offset += 4;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timePutIntsHybrid(final PureHybridMemorySegment segment, final int num, final int rounds) {
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.putInt(offset, i);
+				offset += 4;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+
+	private static long timePutIntsAbstract(final MemorySegment segment, final int num, final int rounds) {
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.putInt(offset, i);
+				offset += 4;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timeGetIntsOnHeap(final PureHeapMemorySegment segment, final int num, final int rounds) {
+		int l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.getInt(offset);
+				offset += 4;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+	
+	private static long timeGetIntsOffHeap(final PureOffHeapMemorySegment segment, final int num, final int rounds) {
+		int l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.getInt(offset);
+				offset += 4;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+	
+	private static long timeGetIntsHybrid(final PureHybridMemorySegment segment, final int num, final int rounds) {
+		int l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.getInt(offset);
+				offset += 4;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+
+	private static long timeGetIntsAbstract(final MemorySegment segment, final int num, final int rounds) {
+		int l = 0;
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				l += segment.getInt(offset);
+				offset += 4;
+			}
+		}
+		long end = System.nanoTime();
+		sideEffect += l;
+		return end - start;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                                  BYTE ARRAYs
+	// --------------------------------------------------------------------------------------------
+	
+	private static void testPutByteArrays1024(final byte[] heapMemory, final ByteBuffer offHeapMemory, 
+												final int numValues, final int rounds) {
+		
+		final byte[] sourceArray = new byte[1024];
+		for (int i = 0; i < sourceArray.length; i++) {
+			sourceArray[i] = (byte) i;
+		}
+
+		TestRunner pureHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHeapMemorySegment seg = new PureHeapMemorySegment(heapMemory);
+				return timePutByteArrayOnHeap(seg, sourceArray, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(heapMemory);
+				return timePutByteArrayHybrid(seg, sourceArray, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(offHeapMemory);
+				return timePutByteArrayHybrid(seg, sourceArray, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HeapMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timePutByteArrayAbstract(seg, sourceArray, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timePutByteArrayAbstract(seg, sourceArray, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(offHeapMemory, null);
+				return timePutByteArrayAbstract(seg, sourceArray, numValues, rounds);
+			}
+		};
+
+		TestRunner[] tests = {
+				TEST_CORE_ON_HEAP ? coreHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridOffHeapRunner : null,
+				pureHeapRunner, pureHybridHeapRunner, pureHybridOffHeapRunner
+		};
+
+		long[] results = runTestsInRandomOrder(tests, new Random(), 5, true);
+
+		System.out.println(String.format(
+				"Writing %d x %d byte[1024] to %d bytes segment: " +
+						"\n\theap=%,d msecs" +
+						"\n\thybrid-on-heap=%,d msecs" +
+						"\n\thybrid-off-heap=%,d msecs" +
+						"\n\tspecialized heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-off-heap=%,d msecs, ",
+				rounds, numValues, heapMemory.length,
+				(results[0] / 1000000), (results[1] / 1000000), (results[2] / 1000000),
+				(results[3] / 1000000), (results[4] / 1000000), (results[5] / 1000000)));
+	}
+	
+	private static void testGetByteArrays1024(final byte[] heapMemory, final ByteBuffer offHeapMemory,
+												final int numValues, final int rounds) {
+		
+		final byte[] targetArray = new byte[1024];
+
+		TestRunner pureHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHeapMemorySegment seg = new PureHeapMemorySegment(heapMemory);
+				return timeGetByteArrayOnHeap(seg, targetArray, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(heapMemory);
+				return timeGetByteArrayHybrid(seg, targetArray, numValues, rounds);
+			}
+		};
+
+		TestRunner pureHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				PureHybridMemorySegment seg = new PureHybridMemorySegment(offHeapMemory);
+				return timeGetByteArrayHybrid(seg, targetArray, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HeapMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timeGetByteArrayAbstract(seg, targetArray, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOnHeap(heapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledHeapMemory(heapMemory, null);
+				return timeGetByteArrayAbstract(seg, targetArray, numValues, rounds);
+			}
+		};
+
+		TestRunner coreHybridOffHeapRunner = new TestRunner() {
+			@Override
+			public long runTest() {
+				fillOffHeap(offHeapMemory, (byte) 0);
+				MemorySegment seg = HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(offHeapMemory, null);
+				return timeGetByteArrayAbstract(seg, targetArray, numValues, rounds);
+			}
+		};
+
+		TestRunner[] tests = {
+				TEST_CORE_ON_HEAP ? coreHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridHeapRunner : null,
+				TEST_CORE_OFF_HEAP ? coreHybridOffHeapRunner : null,
+				pureHeapRunner, pureHybridHeapRunner, pureHybridOffHeapRunner
+		};
+
+		long[] results = runTestsInRandomOrder(tests, new Random(), 5, true);
+
+		System.out.println(String.format(
+				"Reading %d x %d byte[1024] from %d bytes segment: " +
+						"\n\theap=%,d msecs" +
+						"\n\thybrid-on-heap=%,d msecs" +
+						"\n\thybrid-off-heap=%,d msecs" +
+						"\n\tspecialized heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-heap=%,d msecs, " +
+						"\n\tspecialized-hybrid-off-heap=%,d msecs, ",
+				rounds, numValues, heapMemory.length,
+				(results[0] / 1000000), (results[1] / 1000000), (results[2] / 1000000),
+				(results[3] / 1000000), (results[4] / 1000000), (results[5] / 1000000)));
+	}
+	
+	private static long timePutByteArrayOnHeap(final PureHeapMemorySegment segment, final byte[] source, final int num, final int rounds) {
+		final int len = source.length;
+		
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.put(offset, source, 0, len);
+				offset += len;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timePutByteArrayOffHeap(final PureOffHeapMemorySegment segment, final byte[] source, final int num, final int rounds) {
+		final int len = source.length;
+		
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.put(offset, source, 0, len);
+				offset += len;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timePutByteArrayHybrid(final PureHybridMemorySegment segment, final byte[] source, final int num, final int rounds) {
+		final int len = source.length;
+		
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.put(offset, source, 0, len);
+				offset += len;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+
+	private static long timePutByteArrayAbstract(final MemorySegment segment, final byte[] source, final int num, final int rounds) {
+		final int len = source.length;
+
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.put(offset, source, 0, len);
+				offset += len;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timeGetByteArrayOnHeap(final PureHeapMemorySegment segment, final byte[] target, final int num, final int rounds) {
+		final int len = target.length;
+		
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.get(offset, target, 0, len);
+				offset += len;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timeGetByteArrayOffHeap(final PureOffHeapMemorySegment segment, final byte[] target, final int num, final int rounds) {
+		final int len = target.length;
+		
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.get(offset, target, 0, len);
+				offset += len;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+	
+	private static long timeGetByteArrayHybrid(final PureHybridMemorySegment segment, final byte[] target, final int num, final int rounds) {
+		final int len = target.length;
+		
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.get(offset, target, 0, len);
+				offset += len;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+
+	private static long timeGetByteArrayAbstract(final MemorySegment segment, final byte[] target, final int num, final int rounds) {
+		final int len = target.length;
+
+		// checked segment
+		long start = System.nanoTime();
+		for (int round = 0; round < rounds; round++) {
+			int offset = 0;
+			for (int i = 0; i < num; i++) {
+				segment.get(offset, target, 0, len);
+				offset += len;
+			}
+		}
+		long end = System.nanoTime();
+		return end - start;
+	}
+
+//	// --------------------------------------------------------------------------------------------
+//	//                                  LONG BIG ENDIAN
+//	// --------------------------------------------------------------------------------------------
+//
+//	private static void testPutLongsBigEndian(byte[] heapMemory, ByteBuffer offHeapMemory, int numValues, int rounds) {
+//		// test the pure heap memory 
+//		fillOnHeap(heapMemory, (byte) 0);
+//		PureHeapMemorySegment heapMemorySegment = new PureHeapMemorySegment(heapMemory);
+//		long elapsedOnHeap = timePutLongsBigEndianOnHeap(heapMemorySegment, numValues, rounds);
+//
+//		// test the pure off-heap memory
+//		fillOffHeap(offHeapMemory, (byte) 0);
+//		PureOffHeapMemorySegment offHeapMemorySegment = new PureOffHeapMemorySegment(offHeapMemory);
+//		long elapsedOffHeap = timePutLongsBigEndianOffHeap(offHeapMemorySegment, numValues, rounds);
+//
+//		// test hybrid on heap 
+//		fillOnHeap(heapMemory, (byte) 0);
+//		PureHybridMemorySegment hybridOnHeap = new PureHybridMemorySegment(heapMemory);
+//		long elapsedHybridOnHeap = timePutLongsBigEndianHybrid(hybridOnHeap, numValues, rounds);
+//
+//		// test hybrid off heap 
+//		fillOffHeap(offHeapMemory, (byte) 0);
+//		PureHybridMemorySegment hybridOffeap = new PureHybridMemorySegment(offHeapMemory);
+//		long elapsedHybridOffHeap = timePutLongsBigEndianHybrid(hybridOffeap, numValues, rounds);
+//
+//		System.out.println(String.format(
+//				"Writing %d x %d big-endian longs to %d bytes segment: " +
+//						"heap=%,d msecs, " +
+//						"off-heap=%,d msecs, " +
+//						"hybrid-on-heap=%,d msecs, " +
+//						"hybrid-off-heap=%,d msecs",
+//				rounds, numValues, heapMemory.length,
+//				(elapsedOnHeap / 1000000), (elapsedOffHeap / 1000000),
+//				(elapsedHybridOnHeap / 1000000), (elapsedHybridOffHeap / 1000000)));
+//	}
+//
+//	private static void testGetLongsBigEndian(byte[] heapMemory, ByteBuffer offHeapMemory, int numValues, int rounds) {
+//		// test the pure heap memory 
+//		fillOnHeap(heapMemory, (byte) 0);
+//		PureHeapMemorySegment heapMemorySegment = new PureHeapMemorySegment(heapMemory);
+//		long elapsedOnHeap = timeGetLongsBigEndianOnHeap(heapMemorySegment, numValues, rounds);
+//
+//		// test the pure off-heap memory
+//		fillOffHeap(offHeapMemory, (byte) 0);
+//		PureOffHeapMemorySegment offHeapMemorySegment = new PureOffHeapMemorySegment(offHeapMemory);
+//		long elapsedOffHeap = timeGetLongsBigEndianOffHeap(offHeapMemorySegment, numValues, rounds);
+//
+//		// test hybrid on heap 
+//		fillOnHeap(heapMemory, (byte) 0);
+//		PureHybridMemorySegment hybridOnHeap = new PureHybridMemorySegment(heapMemory);
+//		long elapsedHybridOnHeap = timeGetLongsBigEndianHybrid(hybridOnHeap, numValues, rounds);
+//
+//		// test hybrid off heap 
+//		fillOffHeap(offHeapMemory, (byte) 0);
+//		PureHybridMemorySegment hybridOffeap = new PureHybridMemorySegment(offHeapMemory);
+//		long elapsedHybridOffHeap = timeGetLongsBigEndianHybrid(hybridOffeap, numValues, rounds);
+//
+//		System.out.println(String.format(
+//				"Reading %d x %d big-endian longs from %d bytes segment: " +
+//						"heap=%,d msecs, " +
+//						"off-heap=%,d msecs, " +
+//						"hybrid-on-heap=%,d msecs, " +
+//						"hybrid-off-heap=%,d msecs",
+//				rounds, numValues, heapMemory.length,
+//				(elapsedOnHeap / 1000000), (elapsedOffHeap / 1000000),
+//				(elapsedHybridOnHeap / 1000000), (elapsedHybridOffHeap / 1000000)));
+//	}
+//
+//	private static long timePutLongsBigEndianOnHeap(final PureHeapMemorySegment segment, final int num, final int rounds) {
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				segment.putLongBigEndian(offset, LONG_VALUE);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		return end - start;
+//	}
+//
+//	private static long timePutLongsBigEndianOffHeap(final PureOffHeapMemorySegment segment, final int num, final int rounds) {
+//		// checked segment
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				segment.putLongBigEndian(offset, LONG_VALUE);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		return end - start;
+//	}
+//
+//	private static long timePutLongsBigEndianHybrid(final PureHybridMemorySegment segment, final int num, final int rounds) {
+//		// checked segment
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				segment.putLongBigEndian(offset, LONG_VALUE);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		return end - start;
+//	}
+//
+//	private static long timeGetLongsBigEndianOnHeap(final PureHeapMemorySegment segment, final int num, final int rounds) {
+//		long l = 0;
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				l += segment.getLongBigEndian(offset);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		sideEffect += l;
+//		return end - start;
+//	}
+//
+//	private static long timeGetLongsBigEndianOffHeap(final PureOffHeapMemorySegment segment, final int num, final int rounds) {
+//		long l = 0;
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				l += segment.getLongBigEndian(offset);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		sideEffect += l;
+//		return end - start;
+//	}
+//
+//	private static long timeGetLongsBigEndianHybrid(final PureHybridMemorySegment segment, final int num, final int rounds) {
+//		// checked segment
+//		long l = 0;
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				l += segment.getLongBigEndian(offset);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		sideEffect += l;
+//		return end - start;
+//	}
+//
+//	// --------------------------------------------------------------------------------------------
+//	//                                  LONG LITTLE ENDIAN
+//	// --------------------------------------------------------------------------------------------
+//
+//	private static void testPutLongsLittleEndian(byte[] heapMemory, ByteBuffer offHeapMemory, int numValues, int rounds) {
+//		// test the pure heap memory 
+//		fillOnHeap(heapMemory, (byte) 0);
+//		PureHeapMemorySegment heapMemorySegment = new PureHeapMemorySegment(heapMemory);
+//		long elapsedOnHeap = timePutLongsLittleEndianOnHeap(heapMemorySegment, numValues, rounds);
+//
+//		// test the pure off-heap memory
+//		fillOffHeap(offHeapMemory, (byte) 0);
+//		PureOffHeapMemorySegment offHeapMemorySegment = new PureOffHeapMemorySegment(offHeapMemory);
+//		long elapsedOffHeap = timePutLongsLittleEndianOffHeap(offHeapMemorySegment, numValues, rounds);
+//
+//		// test hybrid on heap 
+//		fillOnHeap(heapMemory, (byte) 0);
+//		PureHybridMemorySegment hybridOnHeap = new PureHybridMemorySegment(heapMemory);
+//		long elapsedHybridOnHeap = timePutLongsLittleEndianHybrid(hybridOnHeap, numValues, rounds);
+//
+//		// test hybrid off heap 
+//		fillOffHeap(offHeapMemory, (byte) 0);
+//		PureHybridMemorySegment hybridOffeap = new PureHybridMemorySegment(offHeapMemory);
+//		long elapsedHybridOffHeap = timePutLongsLittleEndianHybrid(hybridOffeap, numValues, rounds);
+//
+//		System.out.println(String.format(
+//				"Writing %d x %d little-endian longs to %d bytes segment: " +
+//						"heap=%,d msecs, " +
+//						"off-heap=%,d msecs, " +
+//						"hybrid-on-heap=%,d msecs, " +
+//						"hybrid-off-heap=%,d msecs",
+//				rounds, numValues, heapMemory.length,
+//				(elapsedOnHeap / 1000000), (elapsedOffHeap / 1000000),
+//				(elapsedHybridOnHeap / 1000000), (elapsedHybridOffHeap / 1000000)));
+//	}
+//
+//	private static void testGetLongsLittleEndian(byte[] heapMemory, ByteBuffer offHeapMemory, int numValues, int rounds) {
+//		// test the pure heap memory 
+//		fillOnHeap(heapMemory, (byte) 0);
+//		PureHeapMemorySegment heapMemorySegment = new PureHeapMemorySegment(heapMemory);
+//		long elapsedOnHeap = timeGetLongsLittleEndianOnHeap(heapMemorySegment, numValues, rounds);
+//
+//		// test the pure off-heap memory
+//		fillOffHeap(offHeapMemory, (byte) 0);
+//		PureOffHeapMemorySegment offHeapMemorySegment = new PureOffHeapMemorySegment(offHeapMemory);
+//		long elapsedOffHeap = timeGetLongsLittleEndianOffHeap(offHeapMemorySegment, numValues, rounds);
+//
+//		// test hybrid on heap 
+//		fillOnHeap(heapMemory, (byte) 0);
+//		PureHybridMemorySegment hybridOnHeap = new PureHybridMemorySegment(heapMemory);
+//		long elapsedHybridOnHeap = timeGetLongsLittleEndianHybrid(hybridOnHeap, numValues, rounds);
+//
+//		// test hybrid off heap 
+//		fillOffHeap(offHeapMemory, (byte) 0);
+//		PureHybridMemorySegment hybridOffeap = new PureHybridMemorySegment(offHeapMemory);
+//		long elapsedHybridOffHeap = timeGetLongsLittleEndianHybrid(hybridOffeap, numValues, rounds);
+//
+//		System.out.println(String.format(
+//				"Reading %d x %d little-endian longs from %d bytes segment: " +
+//						"heap=%,d msecs, " +
+//						"off-heap=%,d msecs, " +
+//						"hybrid-on-heap=%,d msecs, " +
+//						"hybrid-off-heap=%,d msecs",
+//				rounds, numValues, heapMemory.length,
+//				(elapsedOnHeap / 1000000), (elapsedOffHeap / 1000000),
+//				(elapsedHybridOnHeap / 1000000), (elapsedHybridOffHeap / 1000000)));
+//	}
+//
+//	private static long timePutLongsLittleEndianOnHeap(final PureHeapMemorySegment segment, final int num, final int rounds) {
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				segment.putLongLittleEndian(offset, LONG_VALUE);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		return end - start;
+//	}
+//
+//	private static long timePutLongsLittleEndianOffHeap(final PureOffHeapMemorySegment segment, final int num, final int rounds) {
+//		// checked segment
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				segment.putLongLittleEndian(offset, LONG_VALUE);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		return end - start;
+//	}
+//
+//	private static long timePutLongsLittleEndianHybrid(final PureHybridMemorySegment segment, final int num, final int rounds) {
+//		// checked segment
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				segment.putLongLittleEndian(offset, LONG_VALUE);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		return end - start;
+//	}
+//
+//	private static long timeGetLongsLittleEndianOnHeap(final PureHeapMemorySegment segment, final int num, final int rounds) {
+//		long l = 0;
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				l += segment.getLongLittleEndian(offset);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		sideEffect += l;
+//		return end - start;
+//	}
+//
+//	private static long timeGetLongsLittleEndianOffHeap(final PureOffHeapMemorySegment segment, final int num, final int rounds) {
+//		long l = 0;
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				l += segment.getLongLittleEndian(offset);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		sideEffect += l;
+//		return end - start;
+//	}
+//
+//	private static long timeGetLongsLittleEndianHybrid(final PureHybridMemorySegment segment, final int num, final int rounds) {
+//		// checked segment
+//		long l = 0;
+//		long start = System.nanoTime();
+//		for (int round = 0; round < rounds; round++) {
+//			int offset = 0;
+//			for (int i = 0; i < num; i++) {
+//				l += segment.getLongLittleEndian(offset);
+//				offset += 8;
+//			}
+//		}
+//		long end = System.nanoTime();
+//		sideEffect += l;
+//		return end - start;
+//	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static void fillOnHeap(byte[] buffer, byte data) {
+		for (int i = 0; i < buffer.length; i++) {
+			buffer[i] = data;
+		}
+	}
+	
+	private static void fillOffHeap(ByteBuffer buffer, byte data) {
+		final int len = buffer.capacity();
+		for (int i = 0; i < len; i++) {
+			buffer.put(i, data);
+		}
+	}
+	
+	private static long[] runTestsInRandomOrder(TestRunner[] runners, Random rnd, int numRuns, boolean printMeasures) {
+		if (numRuns < 3) {
+			throw new IllegalArgumentException("must do at least three runs");
+		}
+		
+		// we run all runners in random order, to account for the JIT effects that specialize methods
+		// The observation is that either earlier tests suffer from performance because the JIT needs to kick
+		// in first, or that later tests suffer from performance, because the HIT optimized for the other case already
+		
+		long[][] measures = new long[runners.length][];
+		for (int i = 0; i < measures.length; i++) {
+			measures[i] = new long[numRuns];
+		}
+		
+		for (int test = 0; test < numRuns; test++) {
+			System.out.println("Round " + (test+1) + '/' + numRuns);
+			
+			// pick an order for the tests
+			int[] order = new int[runners.length];
+			for (int i = 0; i < order.length; i++) {
+				order[i] = i;
+			}
+			for (int i = order.length; i > 1; i--) {
+				int pos1 = i-1;
+				int pos2 = rnd.nextInt(i);
+				int tmp = order[pos1];
+				order[pos1] = order[pos2];
+				order[pos2] = tmp;
+			}
+			
+			// run tests
+			for (int pos : order) {
+				TestRunner next = runners[pos];
+				measures[pos][test] = next != null ? next.runTest() : 0L;
+			}
+		}
+		
+		if (printMeasures) {
+			for (long[] series : measures) {
+				StringBuilder bld = new StringBuilder();
+				for (long measure : series) {
+					bld.append(String.format("%,d", (measure / 1000000))).append(" | ");
+				}
+				System.out.println(bld.toString());
+			}
+		}
+		
+		// aggregate the measures
+		long[] results = new long[runners.length];
+		
+		for (int i = 0; i < runners.length; i++) {
+			// cancel out the min and max
+			long max = Long.MIN_VALUE;
+			long min = Long.MAX_VALUE;
+			
+			for (long val : measures[i]) {
+				max = Math.max(max, val);
+				min = Math.min(min, val);
+			}
+			
+			long total = 0L;
+			for (long val : measures[i]) {
+				if (val != max && val != min) {
+					total += val;
+				}
+			}
+			
+			results[i] = total / (numRuns - 2);
+		}
+		
+		return results;
+	}
+	
+	
+	
+	private static interface TestRunner {
+		
+		long runTest();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java
new file mode 100644
index 0000000..e247eed
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegment.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory.benchmarks;
+
+import org.apache.flink.core.memory.MemoryUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public final class PureHeapMemorySegment {
+
+	/** Constant that flags the byte order. Because this is a boolean constant,
+	 * the JIT compiler can use this well to aggressively eliminate the non-applicable code paths */
+	private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
+
+	/** The array in which the data is stored. */
+	private byte[] memory;
+
+	/** Wrapper for I/O requests. */
+	private ByteBuffer wrapper;
+
+	/** The size, stored extra, because we may clear the reference to the byte array */
+	private final int size;
+
+	// -------------------------------------------------------------------------
+	//                             Constructors
+	// -------------------------------------------------------------------------
+
+	/**
+	 * Creates a new memory segment that represents the data in the given byte array.
+	 *
+	 * @param memory The byte array that holds the data.
+	 */
+	public PureHeapMemorySegment(byte[] memory) {
+		this.memory = memory;
+		this.size = memory.length;
+	}
+
+	// -------------------------------------------------------------------------
+	//                      Direct Memory Segment Specifics
+	// -------------------------------------------------------------------------
+
+	/**
+	 * Gets the byte array that backs this memory segment.
+	 *
+	 * @return The byte array that backs this memory segment.
+	 */
+	public byte[] getArray() {
+		return this.memory;
+	}
+
+	// -------------------------------------------------------------------------
+	//                        MemorySegment Accessors
+	// -------------------------------------------------------------------------
+	
+	public final boolean isFreed() {
+		return this.memory == null;
+	}
+	
+	public final void free() {
+		this.wrapper = null;
+		this.memory = null;
+	}
+	
+	public final int size() {
+		return this.size;
+	}
+	
+	public final ByteBuffer wrap(int offset, int length) {
+		if (offset > this.memory.length || offset > this.memory.length - length) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		if (this.wrapper == null) {
+			this.wrapper = ByteBuffer.wrap(this.memory, offset, length);
+		}
+		else {
+			this.wrapper.limit(offset + length);
+			this.wrapper.position(offset);
+		}
+
+		return this.wrapper;
+	}
+	
+	// ------------------------------------------------------------------------
+	//                    Random Access get() and put() methods
+	// ------------------------------------------------------------------------
+	
+	public final byte get(int index) {
+		return this.memory[index];
+	}
+	
+	public final void put(int index, byte b) {
+		this.memory[index] = b;
+	}
+	
+	public final void get(int index, byte[] dst) {
+		get(index, dst, 0, dst.length);
+	}
+	
+	public final void put(int index, byte[] src) {
+		put(index, src, 0, src.length);
+	}
+	
+	public final void get(int index, byte[] dst, int offset, int length) {
+		// system arraycopy does the boundary checks anyways, no need to check extra
+		System.arraycopy(this.memory, index, dst, offset, length);
+	}
+	
+	public final void put(int index, byte[] src, int offset, int length) {
+		// system arraycopy does the boundary checks anyways, no need to check extra
+		System.arraycopy(src, offset, this.memory, index, length);
+	}
+	
+	public final boolean getBoolean(int index) {
+		return this.memory[index] != 0;
+	}
+	
+	public final void putBoolean(int index, boolean value) {
+		this.memory[index] = (byte) (value ? 1 : 0);
+	}
+	
+	@SuppressWarnings("restriction")
+	public final char getChar(int index) {
+		if (index >= 0 && index <= this.memory.length - 2) {
+			return UNSAFE.getChar(this.memory, BASE_OFFSET + index);
+		} else {
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	public final char getCharLittleEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return getChar(index);
+		} else {
+			return Character.reverseBytes(getChar(index));
+		}
+	}
+
+	public final char getCharBigEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return Character.reverseBytes(getChar(index));
+		} else {
+			return getChar(index);
+		}
+	}
+
+	@SuppressWarnings("restriction")
+	public final void putChar(int index, char value) {
+		if (index >= 0 && index <= this.memory.length - 2) {
+			UNSAFE.putChar(this.memory, BASE_OFFSET + index, value);
+		} else {
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	public final void putCharLittleEndian(int index, char value) {
+		if (LITTLE_ENDIAN) {
+			putChar(index, value);
+		} else {
+			putChar(index, Character.reverseBytes(value));
+		}
+	}
+
+	public final void putCharBigEndian(int index, char value) {
+		if (LITTLE_ENDIAN) {
+			putChar(index, Character.reverseBytes(value));
+		} else {
+			putChar(index, value);
+		}
+	}
+
+	@SuppressWarnings("restriction")
+	public final short getShort(int index) {
+		if (index >= 0 && index <= this.memory.length - 2) {
+			return UNSAFE.getShort(this.memory, BASE_OFFSET + index);
+		} else {
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	public final short getShortLittleEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return getShort(index);
+		} else {
+			return Short.reverseBytes(getShort(index));
+		}
+	}
+	
+	public final short getShortBigEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return Short.reverseBytes(getShort(index));
+		} else {
+			return getShort(index);
+		}
+	}
+
+	@SuppressWarnings("restriction")
+	public final void putShort(int index, short value) {
+		if (index >= 0 && index <= this.memory.length - 2) {
+			UNSAFE.putShort(this.memory, BASE_OFFSET + index, value);
+		} else {
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	public final void putShortLittleEndian(int index, short value) {
+		if (LITTLE_ENDIAN) {
+			putShort(index, value);
+		} else {
+			putShort(index, Short.reverseBytes(value));
+		}
+	}
+	
+	public final void putShortBigEndian(int index, short value) {
+		if (LITTLE_ENDIAN) {
+			putShort(index, Short.reverseBytes(value));
+		} else {
+			putShort(index, value);
+		}
+	}
+
+	@SuppressWarnings("restriction")
+	public final int getInt(int index) {
+		if (index >= 0 && index <= this.memory.length - 4) {
+			return UNSAFE.getInt(this.memory, BASE_OFFSET + index);
+		} else {
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	public final int getIntLittleEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return getInt(index);
+		} else {
+			return Integer.reverseBytes(getInt(index));
+		}
+	}
+	
+	public final int getIntBigEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return Integer.reverseBytes(getInt(index));
+		} else {
+			return getInt(index);
+		}
+	}
+	
+	@SuppressWarnings("restriction")
+	public final void putInt(int index, int value) {
+		if (index >= 0 && index <= this.memory.length - 4) {
+			UNSAFE.putInt(this.memory, BASE_OFFSET + index, value);
+		} else {
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	public final void putIntLittleEndian(int index, int value) {
+		if (LITTLE_ENDIAN) {
+			putInt(index, value);
+		} else {
+			putInt(index, Integer.reverseBytes(value));
+		}
+	}
+	
+	public final void putIntBigEndian(int index, int value) {
+		if (LITTLE_ENDIAN) {
+			putInt(index, Integer.reverseBytes(value));
+		} else {
+			putInt(index, value);
+		}
+	}
+
+	@SuppressWarnings("restriction")
+	public final long getLong(int index) {
+		if (index >= 0 && index <= this.memory.length - 8) {
+			return UNSAFE.getLong(this.memory, BASE_OFFSET + index);
+		} else {
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	public final long getLongLittleEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return getLong(index);
+		} else {
+			return Long.reverseBytes(getLong(index));
+		}
+	}
+	
+	public final long getLongBigEndian(int index) {
+		if (LITTLE_ENDIAN) {
+			return Long.reverseBytes(getLong(index));
+		} else {
+			return getLong(index);
+		}
+	}
+
+	@SuppressWarnings("restriction")
+	public final void putLong(int index, long value) {
+		if (index >= 0 && index <= this.memory.length - 8) {
+			UNSAFE.putLong(this.memory, BASE_OFFSET + index, value);
+		} else {
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	public final void putLongLittleEndian(int index, long value) {
+		if (LITTLE_ENDIAN) {
+			putLong(index, value);
+		} else {
+			putLong(index, Long.reverseBytes(value));
+		}
+	}
+	
+	public final void putLongBigEndian(int index, long value) {
+		if (LITTLE_ENDIAN) {
+			putLong(index, Long.reverseBytes(value));
+		} else {
+			putLong(index, value);
+		}
+	}
+
+	public final float getFloat(int index) {
+		return Float.intBitsToFloat(getInt(index));
+	}
+	
+	public final float getFloatLittleEndian(int index) {
+		return Float.intBitsToFloat(getIntLittleEndian(index));
+	}
+	
+	public final float getFloatBigEndian(int index) {
+		return Float.intBitsToFloat(getIntBigEndian(index));
+	}
+	
+	public final void putFloat(int index, float value) {
+		putInt(index, Float.floatToRawIntBits(value));
+	}
+	
+	public final void putFloatLittleEndian(int index, float value) {
+		putIntLittleEndian(index, Float.floatToRawIntBits(value));
+	}
+	
+	public final void putFloatBigEndian(int index, float value) {
+		putIntBigEndian(index, Float.floatToRawIntBits(value));
+	}
+	
+	public final double getDouble(int index) {
+		return Double.longBitsToDouble(getLong(index));
+	}
+	
+	public final double getDoubleLittleEndian(int index) {
+		return Double.longBitsToDouble(getLongLittleEndian(index));
+	}
+
+	public final double getDoubleBigEndian(int index) {
+		return Double.longBitsToDouble(getLongBigEndian(index));
+	}
+	
+	public final void putDouble(int index, double value) {
+		putLong(index, Double.doubleToRawLongBits(value));
+	}
+	
+	public final void putDoubleLittleEndian(int index, double value) {
+		putLongLittleEndian(index, Double.doubleToRawLongBits(value));
+	}
+	
+	public final void putDoubleBigEndian(int index, double value) {
+		putLongBigEndian(index, Double.doubleToRawLongBits(value));
+	}
+
+	// -------------------------------------------------------------------------
+	//                     Bulk Read and Write Methods
+	// -------------------------------------------------------------------------
+	
+	public final void get(DataOutput out, int offset, int length) throws IOException {
+		out.write(this.memory, offset, length);
+	}
+	
+	public final void put(DataInput in, int offset, int length) throws IOException {
+		in.readFully(this.memory, offset, length);
+	}
+	
+	public final void get(int offset, ByteBuffer target, int numBytes) {
+		// ByteBuffer performs the boundary checks
+		target.put(this.memory, offset, numBytes);
+	}
+	
+	public final void put(int offset, ByteBuffer source, int numBytes) {
+		// ByteBuffer performs the boundary checks
+		source.get(this.memory, offset, numBytes);
+	}
+	
+	public final void copyTo(int offset, PureHeapMemorySegment target, int targetOffset, int numBytes) {
+		// system arraycopy does the boundary checks anyways, no need to check extra
+		System.arraycopy(this.memory, offset, target.memory, targetOffset, numBytes);
+	}
+
+	// -------------------------------------------------------------------------
+	//                      Comparisons & Swapping
+	// -------------------------------------------------------------------------
+	
+	public final int compare(PureHeapMemorySegment seg2, int offset1, int offset2, int len) {
+		final byte[] b2 = seg2.memory;
+		final byte[] b1 = this.memory;
+
+		int val = 0;
+		for (int pos = 0; pos < len && (val = (b1[offset1 + pos] & 0xff) - (b2[offset2 + pos] & 0xff)) == 0; pos++);
+		return val;
+	}
+
+	public final void swapBytes(PureHeapMemorySegment seg2, int offset1, int offset2, int len) {
+		// swap by bytes (chunks of 8 first, then single bytes)
+		while (len >= 8) {
+			long tmp = this.getLong(offset1);
+			this.putLong(offset1, seg2.getLong(offset2));
+			seg2.putLong(offset2, tmp);
+			offset1 += 8;
+			offset2 += 8;
+			len -= 8;
+		}
+		while (len > 0) {
+			byte tmp = this.get(offset1);
+			this.put(offset1, seg2.get(offset2));
+			seg2.put(offset2, tmp);
+			offset1++;
+			offset2++;
+			len--;
+		}
+	}
+	
+	public final void swapBytes(byte[] auxBuffer, PureHeapMemorySegment seg2, int offset1, int offset2, int len) {
+		byte[] otherMem = seg2.memory;
+		System.arraycopy(this.memory, offset1, auxBuffer, 0, len);
+		System.arraycopy(otherMem, offset2, this.memory, offset1, len);
+		System.arraycopy(auxBuffer, 0, otherMem, offset2, len);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//                     Utilities for native memory accesses and checks
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("restriction")
+	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+	@SuppressWarnings("restriction")
+	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java
new file mode 100644
index 0000000..1e3b89e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureHeapMemorySegmentOutView.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory.benchmarks;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.List;
+
+public final class PureHeapMemorySegmentOutView implements DataOutputView {
+
+	private PureHeapMemorySegment currentSegment;	// the current memory segment to write to
+
+	private int positionInSegment;					// the offset in the current segment
+	
+	private final int segmentSize;				// the size of the memory segments
+
+	private final  List<PureHeapMemorySegment> memorySource;
+	
+	private final List<PureHeapMemorySegment> fullSegments;
+	
+
+	private byte[] utfBuffer;		// the reusable array for UTF encodings
+
+
+	public PureHeapMemorySegmentOutView(List<PureHeapMemorySegment> emptySegments,
+										List<PureHeapMemorySegment> fullSegmentTarget, int segmentSize) {
+		this.segmentSize = segmentSize;
+		this.currentSegment = emptySegments.remove(emptySegments.size() - 1);
+
+		this.memorySource = emptySegments;
+		this.fullSegments = fullSegmentTarget;
+		this.fullSegments.add(getCurrentSegment());
+	}
+
+
+	public void reset() {
+		if (this.fullSegments.size() != 0) {
+			throw new IllegalStateException("The target list still contains memory segments.");
+		}
+
+		clear();
+		try {
+			advance();
+		}
+		catch (IOException ioex) {
+			throw new RuntimeException("Error getting first segment for record collector.", ioex);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                                  Page Management
+	// --------------------------------------------------------------------------------------------
+
+	public PureHeapMemorySegment nextSegment(PureHeapMemorySegment current, int positionInCurrent) throws EOFException {
+		int size = this.memorySource.size();
+		if (size > 0) {
+			final PureHeapMemorySegment next = this.memorySource.remove(size - 1);
+			this.fullSegments.add(next);
+			return next;
+		} else {
+			throw new EOFException();
+		}
+	}
+	
+	public PureHeapMemorySegment getCurrentSegment() {
+		return this.currentSegment;
+	}
+
+	public int getCurrentPositionInSegment() {
+		return this.positionInSegment;
+	}
+	
+	public int getSegmentSize() {
+		return this.segmentSize;
+	}
+	
+	protected void advance() throws IOException {
+		this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment);
+		this.positionInSegment = 0;
+	}
+	
+	protected void seekOutput(PureHeapMemorySegment seg, int position) {
+		this.currentSegment = seg;
+		this.positionInSegment = position;
+	}
+
+	protected void clear() {
+		this.currentSegment = null;
+		this.positionInSegment = 0;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//                               Data Output Specific methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void write(int b) throws IOException {
+		writeByte(b);
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		write(b, 0, b.length);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		int remaining = this.segmentSize - this.positionInSegment;
+		if (remaining >= len) {
+			this.currentSegment.put(this.positionInSegment, b, off, len);
+			this.positionInSegment += len;
+		}
+		else {
+			if (remaining == 0) {
+				advance();
+				remaining = this.segmentSize - this.positionInSegment;
+			}
+			while (true) {
+				int toPut = Math.min(remaining, len);
+				this.currentSegment.put(this.positionInSegment, b, off, toPut);
+				off += toPut;
+				len -= toPut;
+
+				if (len > 0) {
+					this.positionInSegment = this.segmentSize;
+					advance();
+					remaining = this.segmentSize - this.positionInSegment;
+				}
+				else {
+					this.positionInSegment += toPut;
+					break;
+				}
+			}
+		}
+	}
+
+	@Override
+	public void writeBoolean(boolean v) throws IOException {
+		writeByte(v ? 1 : 0);
+	}
+
+	@Override
+	public void writeByte(int v) throws IOException {
+		if (this.positionInSegment < this.segmentSize) {
+			this.currentSegment.put(this.positionInSegment++, (byte) v);
+		}
+		else {
+			advance();
+			writeByte(v);
+		}
+	}
+
+	@Override
+	public void writeShort(int v) throws IOException {
+		if (this.positionInSegment < this.segmentSize - 1) {
+			this.currentSegment.putShortBigEndian(this.positionInSegment, (short) v);
+			this.positionInSegment += 2;
+		}
+		else if (this.positionInSegment == this.segmentSize) {
+			advance();
+			writeShort(v);
+		}
+		else {
+			writeByte(v >> 8);
+			writeByte(v);
+		}
+	}
+
+	@Override
+	public void writeChar(int v) throws IOException {
+		if (this.positionInSegment < this.segmentSize - 1) {
+			this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v);
+			this.positionInSegment += 2;
+		}
+		else if (this.positionInSegment == this.segmentSize) {
+			advance();
+			writeChar(v);
+		}
+		else {
+			writeByte(v >> 8);
+			writeByte(v);
+		}
+	}
+
+	@Override
+	public void writeInt(int v) throws IOException {
+		if (this.positionInSegment < this.segmentSize - 3) {
+			this.currentSegment.putIntBigEndian(this.positionInSegment, v);
+			this.positionInSegment += 4;
+		}
+		else if (this.positionInSegment == this.segmentSize) {
+			advance();
+			writeInt(v);
+		}
+		else {
+			writeByte(v >> 24);
+			writeByte(v >> 16);
+			writeByte(v >>  8);
+			writeByte(v);
+		}
+	}
+
+	@Override
+	public void writeLong(long v) throws IOException {
+		if (this.positionInSegment < this.segmentSize - 7) {
+			this.currentSegment.putLongBigEndian(this.positionInSegment, v);
+			this.positionInSegment += 8;
+		}
+		else if (this.positionInSegment == this.segmentSize) {
+			advance();
+			writeLong(v);
+		}
+		else {
+			writeByte((int) (v >> 56));
+			writeByte((int) (v >> 48));
+			writeByte((int) (v >> 40));
+			writeByte((int) (v >> 32));
+			writeByte((int) (v >> 24));
+			writeByte((int) (v >> 16));
+			writeByte((int) (v >>  8));
+			writeByte((int) v);
+		}
+	}
+
+	@Override
+	public void writeFloat(float v) throws IOException {
+		writeInt(Float.floatToRawIntBits(v));
+	}
+
+	@Override
+	public void writeDouble(double v) throws IOException {
+		writeLong(Double.doubleToRawLongBits(v));
+	}
+
+	@Override
+	public void writeBytes(String s) throws IOException {
+		for (int i = 0; i < s.length(); i++) {
+			writeByte(s.charAt(i));
+		}
+	}
+
+	@Override
+	public void writeChars(String s) throws IOException {
+		for (int i = 0; i < s.length(); i++) {
+			writeChar(s.charAt(i));
+		}
+	}
+
+	@Override
+	public void writeUTF(String str) throws IOException {
+		int strlen = str.length();
+		int utflen = 0;
+		int c, count = 0;
+
+		/* use charAt instead of copying String to char array */
+		for (int i = 0; i < strlen; i++) {
+			c = str.charAt(i);
+			if ((c >= 0x0001) && (c <= 0x007F)) {
+				utflen++;
+			} else if (c > 0x07FF) {
+				utflen += 3;
+			} else {
+				utflen += 2;
+			}
+		}
+
+		if (utflen > 65535) {
+			throw new UTFDataFormatException("encoded string too long: " + utflen + " memory");
+		}
+
+		if (this.utfBuffer == null || this.utfBuffer.length < utflen + 2) {
+			this.utfBuffer = new byte[utflen + 2];
+		}
+		final byte[] bytearr = this.utfBuffer;
+
+		bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+		bytearr[count++] = (byte) (utflen & 0xFF);
+
+		int i = 0;
+		for (i = 0; i < strlen; i++) {
+			c = str.charAt(i);
+			if (!((c >= 0x0001) && (c <= 0x007F))) {
+				break;
+			}
+			bytearr[count++] = (byte) c;
+		}
+
+		for (; i < strlen; i++) {
+			c = str.charAt(i);
+			if ((c >= 0x0001) && (c <= 0x007F)) {
+				bytearr[count++] = (byte) c;
+
+			} else if (c > 0x07FF) {
+				bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+				bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+				bytearr[count++] = (byte) (0x80 | (c & 0x3F));
+			} else {
+				bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+				bytearr[count++] = (byte) (0x80 | (c & 0x3F));
+			}
+		}
+
+		write(bytearr, 0, utflen + 2);
+	}
+
+	@Override
+	public void skipBytesToWrite(int numBytes) throws IOException {
+		while (numBytes > 0) {
+			final int remaining = this.segmentSize - this.positionInSegment;
+			if (numBytes <= remaining) {
+				this.positionInSegment += numBytes;
+				return;
+			}
+			this.positionInSegment = this.segmentSize;
+			advance();
+			numBytes -= remaining;
+		}
+	}
+
+	@Override
+	public void write(DataInputView source, int numBytes) throws IOException {
+		while (numBytes > 0) {
+			final int remaining = this.segmentSize - this.positionInSegment;
+			if (numBytes <= remaining) {
+				this.currentSegment.put(source, this.positionInSegment, numBytes);
+				this.positionInSegment += numBytes;
+				return;
+			}
+
+			if (remaining > 0) {
+				this.currentSegment.put(source, this.positionInSegment, remaining);
+				this.positionInSegment = this.segmentSize;
+				numBytes -= remaining;
+			}
+
+			advance();
+		}
+	}
+}