You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/08 20:59:05 UTC

[15/15] flink git commit: [FLINK-1320] [core] Add an off-heap variant of the managed memory

[FLINK-1320] [core] Add an off-heap variant of the managed memory

This closes #1093


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/655a891d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/655a891d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/655a891d

Branch: refs/heads/master
Commit: 655a891d929db9d858bb5c2edf54419f2b0d3ace
Parents: 1800434
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Aug 30 22:36:46 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 8 20:58:05 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |   14 +-
 .../flink/core/memory/HeapMemorySegment.java    |  203 ++
 .../flink/core/memory/HybridMemorySegment.java  |  466 ++++
 .../apache/flink/core/memory/MemorySegment.java |  938 ++++---
 .../flink/core/memory/MemorySegmentFactory.java |  211 ++
 .../apache/flink/core/memory/MemoryType.java    |   35 +
 .../apache/flink/core/memory/MemoryUtils.java   |   34 +-
 .../common/typeutils/ComparatorTestBase.java    |   10 +-
 .../flink/core/memory/CrossSegmentTypeTest.java |  356 +++
 .../core/memory/EndiannessAccessChecks.java     |  183 ++
 .../core/memory/HeapMemorySegmentTest.java      |   71 +
 .../memory/HybridOffHeapMemorySegmentTest.java  |   84 +
 .../memory/HybridOnHeapMemorySegmentTest.java   |   82 +
 .../core/memory/MemorySegmentChecksTest.java    |  135 +
 .../core/memory/MemorySegmentTestBase.java      | 2571 ++++++++++++++++++
 .../memory/MemorySegmentUndersizedTest.java     | 1367 ++++++++++
 .../memory/OperationsOnFreedSegmentTest.java    | 1195 ++++++++
 .../benchmarks/CoreMemorySegmentOutView.java    |  360 +++
 .../LongSerializationSpeedBenchmark.java        |  232 ++
 .../benchmarks/MemorySegmentSpeedBenchmark.java | 1633 +++++++++++
 .../benchmarks/PureHeapMemorySegment.java       |  466 ++++
 .../PureHeapMemorySegmentOutView.java           |  359 +++
 .../benchmarks/PureHybridMemorySegment.java     |  887 ++++++
 .../PureHybridMemorySegmentOutView.java         |  359 +++
 .../benchmarks/PureOffHeapMemorySegment.java    |  790 ++++++
 .../PureOffHeapMemorySegmentOutView.java        |  359 +++
 .../StringSerializationSpeedBenchmark.java      |  207 ++
 .../apache/flink/types/NormalizableKeyTest.java |   26 +-
 .../flink/runtime/execution/Environment.java    |    2 +-
 .../runtime/io/disk/FileChannelInputView.java   |    4 +-
 .../runtime/io/disk/FileChannelOutputView.java  |    4 +-
 .../runtime/io/disk/RandomAccessInputView.java  |    2 +-
 .../runtime/io/disk/RandomAccessOutputView.java |    2 +-
 .../io/disk/SeekableFileChannelInputView.java   |    4 +-
 .../io/disk/SimpleCollectingOutputView.java     |    2 +-
 .../flink/runtime/io/disk/SpillingBuffer.java   |    2 +-
 .../disk/iomanager/ChannelReaderInputView.java  |    2 +-
 .../disk/iomanager/ChannelWriterOutputView.java |    2 +-
 .../runtime/io/network/NetworkEnvironment.java  |    3 +-
 .../AdaptiveSpanningRecordDeserializer.java     |    6 +-
 .../api/serialization/EventSerializer.java      |    6 +-
 ...llingAdaptiveSpanningRecordDeserializer.java |    6 +-
 .../flink/runtime/io/network/buffer/Buffer.java |    5 +-
 .../io/network/buffer/NetworkBufferPool.java    |   30 +-
 .../netty/PartitionRequestClientHandler.java    |    4 +-
 .../SpilledSubpartitionViewSyncIO.java          |    3 +-
 .../iterative/io/SerializedUpdateBuffer.java    |    4 +-
 .../runtime/memory/AbstractPagedInputView.java  |  568 ++++
 .../runtime/memory/AbstractPagedOutputView.java |  414 +++
 .../runtime/memory/ListMemorySegmentSource.java |   48 +
 .../memory/MemoryAllocationException.java       |   44 +
 .../flink/runtime/memory/MemoryManager.java     |  700 +++++
 .../memorymanager/AbstractPagedInputView.java   |  566 ----
 .../memorymanager/AbstractPagedOutputView.java  |  416 ---
 .../memorymanager/CheckedMemorySegment.java     |  407 ---
 .../memorymanager/DefaultMemoryManager.java     |  490 ----
 .../memorymanager/ListMemorySegmentSource.java  |   48 -
 .../MemoryAllocationException.java              |   44 -
 .../runtime/memorymanager/MemoryManager.java    |  134 -
 .../memorymanager/SimpleMemorySegment.java      |  329 ---
 .../memorymanager/UnsafeMemorySegment.java      |  391 ---
 .../operators/AbstractOuterJoinDriver.java      |    2 +-
 .../flink/runtime/operators/CrossDriver.java    |    2 +-
 .../runtime/operators/FullOuterJoinDriver.java  |    2 +-
 .../operators/GroupReduceCombineDriver.java     |    2 +-
 .../flink/runtime/operators/JoinDriver.java     |    2 +-
 .../runtime/operators/LeftOuterJoinDriver.java  |    2 +-
 .../runtime/operators/PactTaskContext.java      |    2 +-
 .../runtime/operators/ReduceCombineDriver.java  |    2 +-
 .../runtime/operators/RegularPactTask.java      |    2 +-
 .../runtime/operators/RightOuterJoinDriver.java |    2 +-
 .../flink/runtime/operators/TempBarrier.java    |    6 +-
 .../chaining/GroupCombineChainedDriver.java     |    2 +-
 .../SynchronousChainedCombineDriver.java        |    2 +-
 .../operators/hash/CompactingHashTable.java     |    2 +-
 .../operators/hash/HashMatchIteratorBase.java   |    4 +-
 .../runtime/operators/hash/HashPartition.java   |    4 +-
 .../operators/hash/InMemoryPartition.java       |    6 +-
 .../NonReusingBuildFirstHashMatchIterator.java  |    4 +-
 ...ngBuildFirstReOpenableHashMatchIterator.java |    4 +-
 .../NonReusingBuildSecondHashMatchIterator.java |    4 +-
 ...gBuildSecondReOpenableHashMatchIterator.java |    4 +-
 .../ReusingBuildFirstHashMatchIterator.java     |    4 +-
 ...ngBuildFirstReOpenableHashMatchIterator.java |    4 +-
 .../ReusingBuildSecondHashMatchIterator.java    |    4 +-
 ...gBuildSecondReOpenableHashMatchIterator.java |    4 +-
 .../AbstractBlockResettableIterator.java        |    6 +-
 .../BlockResettableMutableObjectIterator.java   |    4 +-
 .../NonReusingBlockResettableIterator.java      |    4 +-
 .../ReusingBlockResettableIterator.java         |    4 +-
 .../resettable/SpillingResettableIterator.java  |    6 +-
 ...SpillingResettableMutableObjectIterator.java |    6 +-
 .../sort/AbstractMergeInnerJoinIterator.java    |    4 +-
 .../operators/sort/AbstractMergeIterator.java   |    4 +-
 .../sort/AbstractMergeOuterJoinIterator.java    |    4 +-
 .../sort/CombiningUnilateralSortMerger.java     |    4 +-
 .../operators/sort/FixedLengthRecordSorter.java |   12 +-
 .../operators/sort/LargeRecordHandler.java      |    2 +-
 .../sort/NonReusingMergeInnerJoinIterator.java  |    4 +-
 .../sort/NonReusingMergeOuterJoinIterator.java  |    4 +-
 .../operators/sort/NormalizedKeySorter.java     |   18 +-
 .../sort/ReusingMergeInnerJoinIterator.java     |    4 +-
 .../sort/ReusingMergeOuterJoinIterator.java     |    4 +-
 .../operators/sort/UnilateralSortMerger.java    |    4 +-
 .../operators/util/CoGroupTaskIterator.java     |    2 +-
 .../operators/util/JoinTaskIterator.java        |    2 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |    2 +-
 .../apache/flink/runtime/taskmanager/Task.java  |    2 +-
 .../runtime/util/EnvironmentInformation.java    |   19 +-
 .../NetworkEnvironmentConfiguration.scala       |    4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   99 +-
 .../flink/runtime/io/disk/ChannelViewsTest.java |    6 +-
 .../io/disk/FileChannelStreamsITCase.java       |    7 +-
 .../runtime/io/disk/FileChannelStreamsTest.java |    8 +-
 .../disk/SeekableFileChannelInputViewTest.java  |   24 +-
 .../runtime/io/disk/SpillingBufferTest.java     |   31 +-
 .../AsynchronousFileIOChannelTest.java          |    8 +-
 .../BufferFileWriterFileSegmentReaderTest.java  |    7 +-
 .../iomanager/BufferFileWriterReaderTest.java   |    4 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java   |    5 +-
 .../io/disk/iomanager/IOManagerITCase.java      |    8 +-
 .../IOManagerPerformanceBenchmark.java          |    6 +-
 .../io/network/NetworkEnvironmentTest.java      |    4 +-
 .../api/serialization/PagedViewsTest.java       |   15 +-
 .../SpanningRecordSerializationTest.java        |    4 +-
 .../SpanningRecordSerializerTest.java           |    7 +-
 .../io/network/api/writer/RecordWriterTest.java |    9 +-
 .../network/buffer/BufferPoolFactoryTest.java   |    3 +-
 .../runtime/io/network/buffer/BufferTest.java   |    5 +-
 .../io/network/buffer/LocalBufferPoolTest.java  |    3 +-
 .../network/buffer/NetworkBufferPoolTest.java   |    5 +-
 .../netty/NettyMessageSerializationTest.java    |    4 +-
 .../IteratorWrappingTestSingleInputGate.java    |    4 +-
 .../consumer/LocalInputChannelTest.java         |    3 +-
 .../partition/consumer/SingleInputGateTest.java |    5 +-
 .../network/serialization/LargeRecordsTest.java |    6 +-
 .../io/network/util/TestBufferFactory.java      |    6 +-
 .../memory/MemoryManagerLazyAllocationTest.java |    7 +-
 .../flink/runtime/memory/MemoryManagerTest.java |    7 +-
 .../runtime/memory/MemorySegmentSimpleTest.java |  576 ++++
 .../memory/MemorySegmentSpeedBenchmark.java     |  867 ------
 .../flink/runtime/memory/MemorySegmentTest.java |  575 ----
 .../operators/drivers/TestTaskContext.java      |    7 +-
 .../operators/hash/CompactingHashTableTest.java |    3 +-
 .../runtime/operators/hash/HashTableITCase.java |    7 +-
 .../hash/HashTablePerformanceComparison.java    |    3 +-
 .../runtime/operators/hash/HashTableTest.java   |    3 +-
 .../operators/hash/MemoryHashTableTest.java     |    3 +-
 .../MutableHashTablePerformanceBenchmark.java   |    8 +-
 .../hash/NonReusingHashMatchIteratorITCase.java |    5 +-
 .../NonReusingReOpenableHashTableITCase.java    |    8 +-
 .../hash/ReusingHashMatchIteratorITCase.java    |    5 +-
 .../hash/ReusingReOpenableHashTableITCase.java  |    8 +-
 ...lockResettableMutableObjectIteratorTest.java |   21 +-
 .../NonReusingBlockResettableIteratorTest.java  |   11 +-
 .../ReusingBlockResettableIteratorTest.java     |   13 +-
 .../SpillingResettableIteratorTest.java         |    7 +-
 ...lingResettableMutableObjectIteratorTest.java |    6 +-
 ...bstractSortMergeOuterJoinIteratorITCase.java |    8 +-
 .../CombiningUnilateralSortMergerITCase.java    |    7 +-
 .../operators/sort/ExternalSortITCase.java      |    5 +-
 .../sort/ExternalSortLargeRecordsITCase.java    |    5 +-
 .../sort/FixedLengthRecordSorterTest.java       |   14 +-
 .../sort/LargeRecordHandlerITCase.java          |   28 +-
 .../operators/sort/LargeRecordHandlerTest.java  |   21 +-
 ...ReusingSortMergeInnerJoinIteratorITCase.java |   17 +-
 ...ReusingSortMergeOuterJoinIteratorITCase.java |    2 +-
 .../operators/sort/NormalizedKeySorterTest.java |   15 +-
 ...ReusingSortMergeInnerJoinIteratorITCase.java |   17 +-
 ...ReusingSortMergeOuterJoinIteratorITCase.java |    2 +-
 .../testutils/BinaryOperatorTestBase.java       |    5 +-
 .../operators/testutils/DriverTestBase.java     |    5 +-
 .../operators/testutils/MockEnvironment.java    |    9 +-
 .../operators/testutils/TaskTestBase.java       |    2 +-
 .../testutils/UnaryOperatorTestBase.java        |    5 +-
 .../runtime/operators/util/BloomFilterTest.java |    5 +-
 .../operators/util/HashVsSortMiniBenchmark.java |    6 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |    2 +-
 ...askManagerComponentsStartupShutdownTest.java |    8 +-
 .../flink/runtime/taskmanager/TaskTest.java     |    2 +-
 .../util/DataInputOutputSerializerTest.java     |    4 +-
 .../testingUtils/TestingTaskManager.scala       |    4 +-
 .../streaming/runtime/io/BufferSpiller.java     |    3 +-
 .../consumer/StreamTestSingleInputGate.java     |   18 +-
 .../io/BarrierBufferMassiveRandomTest.java      |    5 +-
 .../streaming/runtime/io/BarrierBufferTest.java |    8 +-
 .../runtime/io/BarrierTrackerTest.java          |    4 +-
 .../streaming/runtime/io/BufferSpillerTest.java |    3 +-
 .../runtime/io/StreamRecordWriterTest.java      |    7 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   11 +-
 .../runtime/tasks/StreamTaskTestHarness.java    |    2 +-
 .../tez/runtime/TezRuntimeEnvironment.java      |   11 +-
 .../org/apache/flink/tez/runtime/TezTask.java   |    2 +-
 .../HashTableRecordWidthCombinations.java       |    3 +-
 .../flink/test/manual/MassiveStringSorting.java |    8 +-
 .../test/manual/MassiveStringValueSorting.java  |   19 +-
 .../misc/MassiveCaseClassSortingITCase.scala    |    4 +-
 .../scala/runtime/CaseClassComparatorTest.scala |   30 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |    4 +-
 pom.xml                                         |    2 +-
 tools/maven/checkstyle.xml                      |    2 +-
 201 files changed, 16226 insertions(+), 5096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9ec71d2..bbaf71a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -125,9 +125,14 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";
 
 	/**
-	 * The key for the config parameter defining whether the memory manager allocates memory lazy.
+	 * The fraction of off-heap memory relative to the heap size.
 	 */
-	public static final String TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY = "taskmanager.memory.lazyalloc";
+	public static final String TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY = "taskmanager.memory.off-heap-ratio";
+	
+	/**
+	 * The config parameter defining the memory allocation method (JVM heap or off-heap).
+	*/
+	public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap";
 
 	/**
 	 * The config parameter defining the number of buffers used in the network stack. This defines the
@@ -543,6 +548,11 @@ public final class ConfigConstants {
 	public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;
 
 	/**
+	 * The default ratio of heap to off-heap memory, when the TaskManager is started with off-heap memory.
+	 */
+	public static final float DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO = 3.0f;
+	
+	/**
 	 * Default number of buffers used in the network stack.
 	 */
 	public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
new file mode 100644
index 0000000..0685d59
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * This class represents a piece of heap memory managed by Flink.
+ * The segment is backed by a byte array and features random put and get methods for the basic types,
+ * as well as compare and swap methods.
+ * <p>
+ * This class specialized byte access and byte copy calls for heap memory, while reusing the
+ * multi-byte type accesses and cross-segment operations from the MemorySegment.
+ * <p>
+ * Note that memory segments should usually not be allocated manually, but rather through the
+ * {@link MemorySegmentFactory}.
+ */
+public final class HeapMemorySegment extends MemorySegment {
+
+	/** An extra reference to the heap memory, so we can let byte array checks fail 
+	 *  by the built-in checks automatically without extra checks */
+	private byte[] memory;
+
+	/**
+	 * Creates a new memory segment that represents the data in the given byte array.
+	 * The owner of this memory segment is null.
+	 *
+	 * @param memory The byte array that holds the data.
+	 */
+	HeapMemorySegment(byte[] memory) {
+		this(memory, null);
+	}
+	
+	/**
+	 * Creates a new memory segment that represents the data in the given byte array.
+	 * The memory segment references the given owner.
+	 *
+	 * @param memory The byte array that holds the data.
+	 * @param owner The owner referenced by the memory segment.
+	 */
+	HeapMemorySegment(byte[] memory, Object owner) {
+		super(Objects.requireNonNull(memory), owner);
+		this.memory = memory;
+	}
+	
+	// -------------------------------------------------------------------------
+	//  MemorySegment operations
+	// -------------------------------------------------------------------------
+
+	@Override
+	public void free() {
+		super.free();
+		this.memory = null;
+	}
+
+	@Override
+	public ByteBuffer wrap(int offset, int length) {
+		try {
+			return ByteBuffer.wrap(this.memory, offset, length);
+		}
+		catch (NullPointerException e) {
+			throw new IllegalStateException("segment has been freed");
+		}
+	}
+
+	/**
+	 * Gets the byte array that backs this memory segment.
+	 *
+	 * @return The byte array that backs this memory segment, or null, if the segment has been freed.
+	 */
+	public byte[] getArray() {
+		return this.heapMemory;
+	}
+	
+	// ------------------------------------------------------------------------
+	//                    Random Access get() and put() methods
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public final byte get(int index) {
+		return this.memory[index];
+	}
+
+	@Override
+	public final void put(int index, byte b) {
+		this.memory[index] = b;
+	}
+
+	@Override
+	public final void get(int index, byte[] dst) {
+		get(index, dst, 0, dst.length);
+	}
+
+	@Override
+	public final void put(int index, byte[] src) {
+		put(index, src, 0, src.length);
+	}
+
+	@Override
+	public final void get(int index, byte[] dst, int offset, int length) {
+		// system arraycopy does the boundary checks anyways, no need to check extra
+		System.arraycopy(this.memory, index, dst, offset, length);
+	}
+
+	@Override
+	public final void put(int index, byte[] src, int offset, int length) {
+		// system arraycopy does the boundary checks anyways, no need to check extra
+		System.arraycopy(src, offset, this.memory, index, length);
+	}
+
+	@Override
+	public final boolean getBoolean(int index) {
+		return this.memory[index] != 0;
+	}
+
+	@Override
+	public final void putBoolean(int index, boolean value) {
+		this.memory[index] = (byte) (value ? 1 : 0);
+	}
+
+	// -------------------------------------------------------------------------
+	//                     Bulk Read and Write Methods
+	// -------------------------------------------------------------------------
+
+	@Override
+	public final void get(DataOutput out, int offset, int length) throws IOException {
+		out.write(this.memory, offset, length);
+	}
+
+	@Override
+	public final void put(DataInput in, int offset, int length) throws IOException {
+		in.readFully(this.memory, offset, length);
+	}
+
+	@Override
+	public final void get(int offset, ByteBuffer target, int numBytes) {
+		// ByteBuffer performs the boundary checks
+		target.put(this.memory, offset, numBytes);
+	}
+
+	@Override
+	public final void put(int offset, ByteBuffer source, int numBytes) {
+		// ByteBuffer performs the boundary checks
+		source.get(this.memory, offset, numBytes);
+	}
+
+	// -------------------------------------------------------------------------
+	//                             Factoring
+	// -------------------------------------------------------------------------
+
+	/**
+	 * A memory segment factory that produces heap memory segments. Note that this factory does not
+	 * support to allocate off-heap memory.
+	 */
+	public static final class HeapMemorySegmentFactory implements MemorySegmentFactory.Factory {
+
+		@Override
+		public HeapMemorySegment wrap(byte[] memory) {
+			return new HeapMemorySegment(memory);
+		}
+
+		@Override
+		public HeapMemorySegment allocateUnpooledSegment(int size, Object owner) {
+			return new HeapMemorySegment(new byte[size], owner);
+		}
+
+		@Override
+		public HeapMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
+			return new HeapMemorySegment(memory, owner);
+		}
+
+		@Override
+		public HeapMemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner) {
+			throw new UnsupportedOperationException(
+					"The MemorySegment factory was not initialized for off-heap memory.");
+		}
+
+		/** prevent external instantiation */
+		HeapMemorySegmentFactory() {}
+	};
+
+	public static final HeapMemorySegmentFactory FACTORY = new HeapMemorySegmentFactory();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
new file mode 100644
index 0000000..f68723b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * This class represents a piece of memory managed by Flink. The memory can be on-heap or off-heap,
+ * this is transparently handled by this class.
+ * <p>
+ * This class specialized byte access and byte copy calls for heap memory, while reusing the
+ * multi-byte type accesses and cross-segment operations from the MemorySegment.
+ * <p>
+ * This class subsumes the functionality of the {@link org.apache.flink.core.memory.HeapMemorySegment}, 
+ * but is a bit less efficient for operations on individual bytes.
+ * <p>
+ * Note that memory segments should usually not be allocated manually, but rather through the
+ * {@link MemorySegmentFactory}.
+ */
+public final class HybridMemorySegment extends MemorySegment {
+	
+	/** The direct byte buffer that allocated the off-heap memory. This memory segment holds a reference
+	 * to that buffer, so as long as this memory segment lives, the memory will not be released. */
+	private final ByteBuffer offHeapBuffer;
+
+	/**
+	 * Creates a new memory segment that represents the memory backing the given direct byte buffer.
+	 * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
+	 * otherwise this method with throw an IllegalArgumentException.
+	 * <p>
+	 * The owner referenced by this memory segment is null.
+	 *
+	 * @param buffer The byte buffer whose memory is represented by this memory segment.
+	 * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
+	 */
+	HybridMemorySegment(ByteBuffer buffer) {
+		this(buffer, null);
+	}
+	
+	/**
+	 * Creates a new memory segment that represents the memory backing the given direct byte buffer.
+	 * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
+	 * otherwise this method with throw an IllegalArgumentException.
+	 * <p>
+	 * The memory segment references the given owner.
+	 *
+	 * @param buffer The byte buffer whose memory is represented by this memory segment.
+	 * @param owner The owner references by this memory segment.
+	 * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
+	 */
+	HybridMemorySegment(ByteBuffer buffer, Object owner) {
+		super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
+		this.offHeapBuffer = buffer;
+	}
+
+	/**
+	 * Creates a new memory segment that represents the memory of the byte array.
+	 * <p>
+	 * The owner referenced by this memory segment is null.
+	 *
+	 * @param buffer The byte array whose memory is represented by this memory segment.
+	 */
+	HybridMemorySegment(byte[] buffer) {
+		this(buffer, null);
+	}
+
+	/**
+	 * Creates a new memory segment that represents the memory of the byte array.
+	 * <p>
+	 * The memory segment references the given owner.
+	 *
+	 * @param buffer The byte array whose memory is represented by this memory segment.
+	 * @param owner The owner references by this memory segment.
+	 */
+	HybridMemorySegment(byte[] buffer, Object owner) {
+		super(buffer, owner);
+		this.offHeapBuffer = null;
+	}
+
+	// -------------------------------------------------------------------------
+	//  MemorySegment operations
+	// -------------------------------------------------------------------------
+	
+	public byte[] getArray() {
+		if (heapMemory != null) {
+			return heapMemory;
+		} else {
+			throw new IllegalStateException("Memory segment does not represent heap memory");
+		}
+	}
+
+	/**
+	 * Gets the buffer that owns the memory of this memory segment.
+	 *
+	 * @return The byte buffer that owns the memory of this memory segment.
+	 */
+	public ByteBuffer getOffHeapBuffer() {
+		if (offHeapBuffer != null) {
+			return offHeapBuffer;
+		} else {
+			throw new IllegalStateException("Memory segment does not represent off heap memory");
+		}
+	}
+
+	@Override
+	public ByteBuffer wrap(int offset, int length) {
+		if (address <= addressLimit) {
+			if (heapMemory != null) {
+				return ByteBuffer.wrap(heapMemory, offset, length);
+			}
+			else {
+				try {
+					ByteBuffer wrapper = offHeapBuffer.duplicate();
+					wrapper.limit(offset + length);
+					wrapper.position(offset);
+					return wrapper;
+				}
+				catch (IllegalArgumentException e) {
+					throw new IndexOutOfBoundsException();
+				}
+			}
+		}
+		else {
+			throw new IllegalStateException("segment has been freed");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Random Access get() and put() methods
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public byte get(int index) {
+		final long pos = address + index;
+		if (index >= 0 && pos < addressLimit) {
+			return UNSAFE.getByte(heapMemory, pos);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	@Override
+	public void put(int index, byte b) {
+		final long pos = address + index;
+		if (index >= 0 && pos < addressLimit) {
+			UNSAFE.putByte(heapMemory, pos, b);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	@Override
+	public void get(int index, byte[] dst) {
+		get(index, dst, 0, dst.length);
+	}
+
+	@Override
+	public void put(int index, byte[] src) {
+		put(index, src, 0, src.length);
+	}
+
+	@Override
+	public void get(int index, byte[] dst, int offset, int length) {
+		// check the byte array offset and length and the status
+		if ( (offset | length | (offset + length) | (dst.length - (offset + length))) < 0) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		final long pos = address + index;
+		if (index >= 0 && pos <= addressLimit - length) {
+			final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
+			UNSAFE.copyMemory(heapMemory, pos, dst, arrayAddress, length);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	@Override
+	public void put(int index, byte[] src, int offset, int length) {
+		// check the byte array offset and length
+		if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {
+			throw new IndexOutOfBoundsException();
+		}
+		
+		final long pos = address + index;
+
+		if (index >= 0 && pos <= addressLimit - length) {
+			final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
+			UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, length);
+		}
+		else if (address > addressLimit) {
+			throw new IllegalStateException("segment has been freed");
+		}
+		else {
+			// index is in fact invalid
+			throw new IndexOutOfBoundsException();
+		}
+	}
+
+	@Override
+	public boolean getBoolean(int index) {
+		return get(index) != 0;
+	}
+
+	@Override
+	public void putBoolean(int index, boolean value) {
+		put(index, (byte) (value ? 1 : 0));
+	}
+
+	// -------------------------------------------------------------------------
+	//  Bulk Read and Write Methods
+	// -------------------------------------------------------------------------
+
+	@Override
+	public final void get(DataOutput out, int offset, int length) throws IOException {
+		if (address <= addressLimit) {
+			if (heapMemory != null) {
+				out.write(heapMemory, offset, length);
+			}
+			else {
+				while (length >= 8) {
+					out.writeLong(getLongBigEndian(offset));
+					offset += 8;
+					length -= 8;
+				}
+		
+				while (length > 0) {
+					out.writeByte(get(offset));
+					offset++;
+					length--;
+				}
+			}
+		}
+		else {
+			throw new IllegalStateException("segment has been freed");
+		}
+	}
+
+	@Override
+	public final void put(DataInput in, int offset, int length) throws IOException {
+		if (address <= addressLimit) {
+			if (heapMemory != null) {
+				in.readFully(heapMemory, offset, length);
+			}
+			else {
+				while (length >= 8) {
+					putLongBigEndian(offset, in.readLong());
+					offset += 8;
+					length -= 8;
+				}
+				while (length > 0) {
+					put(offset, in.readByte());
+					offset++;
+					length--;
+				}
+			}
+		}
+		else {
+			throw new IllegalStateException("segment has been freed");
+		}
+	}
+
+	@Override
+	public final void get(int offset, ByteBuffer target, int numBytes) {
+		// check the byte array offset and length
+		if ((offset | numBytes | (offset + numBytes)) < 0) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		final int targetOffset = target.position();
+		final int remaining = target.remaining();
+
+		if (remaining < numBytes) {
+			throw new BufferOverflowException();
+		}
+
+		if (target.isDirect()) {
+			// copy to the target memory directly
+			final long targetPointer = getAddress(target) + targetOffset;
+			final long sourcePointer = address + offset;
+
+			if (sourcePointer <= addressLimit - numBytes) {
+				UNSAFE.copyMemory(heapMemory, sourcePointer, null, targetPointer, numBytes);
+				target.position(targetOffset + numBytes);
+			}
+			else if (address > addressLimit) {
+				throw new IllegalStateException("segment has been freed");
+			}
+			else {
+				throw new IndexOutOfBoundsException();
+			}
+		}
+		else if (target.hasArray()) {
+			// move directly into the byte array
+			get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes);
+
+			// this must be after the get() call to ensue that the byte buffer is not
+			// modified in case the call fails
+			target.position(targetOffset + numBytes);
+		}
+		else {
+			// neither heap buffer nor direct buffer
+			while (target.hasRemaining()) {
+				target.put(get(offset++));
+			}
+		}
+	}
+
+	@Override
+	public final void put(int offset, ByteBuffer source, int numBytes) {
+		// check the byte array offset and length
+		if ((offset | numBytes | (offset + numBytes)) < 0) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		final int sourceOffset = source.position();
+		final int remaining = source.remaining();
+
+		if (remaining < numBytes) {
+			throw new BufferUnderflowException();
+		}
+
+		if (source.isDirect()) {
+			// copy to the target memory directly
+			final long sourcePointer = getAddress(source) + sourceOffset;
+			final long targetPointer = address + offset;
+
+			if (targetPointer <= addressLimit - numBytes) {
+				UNSAFE.copyMemory(null, sourcePointer, heapMemory, targetPointer, numBytes);
+				source.position(sourceOffset + numBytes);
+			}
+			else if (address > addressLimit) {
+				throw new IllegalStateException("segment has been freed");
+			}
+			else {
+				throw new IndexOutOfBoundsException();
+			}
+		}
+		else if (source.hasArray()) {
+			// move directly into the byte array
+			put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes);
+
+			// this must be after the get() call to ensue that the byte buffer is not
+			// modified in case the call fails
+			source.position(sourceOffset + numBytes);
+		}
+		else {
+			// neither heap buffer nor direct buffer
+			while (source.hasRemaining()) {
+				put(offset++, source.get());
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Utilities for native memory accesses and checks
+	// --------------------------------------------------------------------------------------------
+
+	/** The reflection fields with which we access the off-heap pointer from direct ByteBuffers */
+	private static final Field ADDRESS_FIELD;
+
+	static {
+		try {
+			ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");
+			ADDRESS_FIELD.setAccessible(true);
+		}
+		catch (Throwable t) {
+			throw new RuntimeException(
+					"Cannot initialize HybridMemorySegment: off-heap memory is incompatible with this JVM.", t);
+		}
+	}
+
+	private static long getAddress(ByteBuffer buffer) {
+		if (buffer == null) {
+			throw new NullPointerException("buffer is null");
+		}
+		try {
+			return (Long) ADDRESS_FIELD.get(buffer);
+		}
+		catch (Throwable t) {
+			throw new RuntimeException("Could not access direct byte buffer address.", t);
+		}
+	}
+	
+	private static long checkBufferAndGetAddress(ByteBuffer buffer) {
+		if (buffer == null) {
+			throw new NullPointerException("buffer is null");
+		}
+		if (!buffer.isDirect()) {
+			throw new IllegalArgumentException("Can't initialize from non-direct ByteBuffer.");
+		}
+		return getAddress(buffer);
+	}
+
+	// -------------------------------------------------------------------------
+	//  Factoring
+	// -------------------------------------------------------------------------
+
+	/**
+	 * Base factory for hybrid memory segments.
+	 */
+	public static final class HybridMemorySegmentFactory implements MemorySegmentFactory.Factory {
+		
+		@Override
+		public HybridMemorySegment wrap(byte[] memory) {
+			return new HybridMemorySegment(memory);
+		}
+
+		@Override
+		public HybridMemorySegment allocateUnpooledSegment(int size, Object owner) {
+			return new HybridMemorySegment(new byte[size], owner);
+		}
+
+		@Override
+		public HybridMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
+			return new HybridMemorySegment(memory, owner);
+		}
+
+		@Override
+		public HybridMemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner) {
+			return new HybridMemorySegment(memory, owner);
+		}
+
+		/** prevent external instantiation */
+		HybridMemorySegmentFactory() {}
+	};
+
+	public static final HybridMemorySegmentFactory FACTORY = new HybridMemorySegmentFactory();
+}