You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/08 20:59:05 UTC
[15/15] flink git commit: [FLINK-1320] [core] Add an off-heap variant
of the managed memory
[FLINK-1320] [core] Add an off-heap variant of the managed memory
This closes #1093
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/655a891d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/655a891d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/655a891d
Branch: refs/heads/master
Commit: 655a891d929db9d858bb5c2edf54419f2b0d3ace
Parents: 1800434
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Aug 30 22:36:46 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 8 20:58:05 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 14 +-
.../flink/core/memory/HeapMemorySegment.java | 203 ++
.../flink/core/memory/HybridMemorySegment.java | 466 ++++
.../apache/flink/core/memory/MemorySegment.java | 938 ++++---
.../flink/core/memory/MemorySegmentFactory.java | 211 ++
.../apache/flink/core/memory/MemoryType.java | 35 +
.../apache/flink/core/memory/MemoryUtils.java | 34 +-
.../common/typeutils/ComparatorTestBase.java | 10 +-
.../flink/core/memory/CrossSegmentTypeTest.java | 356 +++
.../core/memory/EndiannessAccessChecks.java | 183 ++
.../core/memory/HeapMemorySegmentTest.java | 71 +
.../memory/HybridOffHeapMemorySegmentTest.java | 84 +
.../memory/HybridOnHeapMemorySegmentTest.java | 82 +
.../core/memory/MemorySegmentChecksTest.java | 135 +
.../core/memory/MemorySegmentTestBase.java | 2571 ++++++++++++++++++
.../memory/MemorySegmentUndersizedTest.java | 1367 ++++++++++
.../memory/OperationsOnFreedSegmentTest.java | 1195 ++++++++
.../benchmarks/CoreMemorySegmentOutView.java | 360 +++
.../LongSerializationSpeedBenchmark.java | 232 ++
.../benchmarks/MemorySegmentSpeedBenchmark.java | 1633 +++++++++++
.../benchmarks/PureHeapMemorySegment.java | 466 ++++
.../PureHeapMemorySegmentOutView.java | 359 +++
.../benchmarks/PureHybridMemorySegment.java | 887 ++++++
.../PureHybridMemorySegmentOutView.java | 359 +++
.../benchmarks/PureOffHeapMemorySegment.java | 790 ++++++
.../PureOffHeapMemorySegmentOutView.java | 359 +++
.../StringSerializationSpeedBenchmark.java | 207 ++
.../apache/flink/types/NormalizableKeyTest.java | 26 +-
.../flink/runtime/execution/Environment.java | 2 +-
.../runtime/io/disk/FileChannelInputView.java | 4 +-
.../runtime/io/disk/FileChannelOutputView.java | 4 +-
.../runtime/io/disk/RandomAccessInputView.java | 2 +-
.../runtime/io/disk/RandomAccessOutputView.java | 2 +-
.../io/disk/SeekableFileChannelInputView.java | 4 +-
.../io/disk/SimpleCollectingOutputView.java | 2 +-
.../flink/runtime/io/disk/SpillingBuffer.java | 2 +-
.../disk/iomanager/ChannelReaderInputView.java | 2 +-
.../disk/iomanager/ChannelWriterOutputView.java | 2 +-
.../runtime/io/network/NetworkEnvironment.java | 3 +-
.../AdaptiveSpanningRecordDeserializer.java | 6 +-
.../api/serialization/EventSerializer.java | 6 +-
...llingAdaptiveSpanningRecordDeserializer.java | 6 +-
.../flink/runtime/io/network/buffer/Buffer.java | 5 +-
.../io/network/buffer/NetworkBufferPool.java | 30 +-
.../netty/PartitionRequestClientHandler.java | 4 +-
.../SpilledSubpartitionViewSyncIO.java | 3 +-
.../iterative/io/SerializedUpdateBuffer.java | 4 +-
.../runtime/memory/AbstractPagedInputView.java | 568 ++++
.../runtime/memory/AbstractPagedOutputView.java | 414 +++
.../runtime/memory/ListMemorySegmentSource.java | 48 +
.../memory/MemoryAllocationException.java | 44 +
.../flink/runtime/memory/MemoryManager.java | 700 +++++
.../memorymanager/AbstractPagedInputView.java | 566 ----
.../memorymanager/AbstractPagedOutputView.java | 416 ---
.../memorymanager/CheckedMemorySegment.java | 407 ---
.../memorymanager/DefaultMemoryManager.java | 490 ----
.../memorymanager/ListMemorySegmentSource.java | 48 -
.../MemoryAllocationException.java | 44 -
.../runtime/memorymanager/MemoryManager.java | 134 -
.../memorymanager/SimpleMemorySegment.java | 329 ---
.../memorymanager/UnsafeMemorySegment.java | 391 ---
.../operators/AbstractOuterJoinDriver.java | 2 +-
.../flink/runtime/operators/CrossDriver.java | 2 +-
.../runtime/operators/FullOuterJoinDriver.java | 2 +-
.../operators/GroupReduceCombineDriver.java | 2 +-
.../flink/runtime/operators/JoinDriver.java | 2 +-
.../runtime/operators/LeftOuterJoinDriver.java | 2 +-
.../runtime/operators/PactTaskContext.java | 2 +-
.../runtime/operators/ReduceCombineDriver.java | 2 +-
.../runtime/operators/RegularPactTask.java | 2 +-
.../runtime/operators/RightOuterJoinDriver.java | 2 +-
.../flink/runtime/operators/TempBarrier.java | 6 +-
.../chaining/GroupCombineChainedDriver.java | 2 +-
.../SynchronousChainedCombineDriver.java | 2 +-
.../operators/hash/CompactingHashTable.java | 2 +-
.../operators/hash/HashMatchIteratorBase.java | 4 +-
.../runtime/operators/hash/HashPartition.java | 4 +-
.../operators/hash/InMemoryPartition.java | 6 +-
.../NonReusingBuildFirstHashMatchIterator.java | 4 +-
...ngBuildFirstReOpenableHashMatchIterator.java | 4 +-
.../NonReusingBuildSecondHashMatchIterator.java | 4 +-
...gBuildSecondReOpenableHashMatchIterator.java | 4 +-
.../ReusingBuildFirstHashMatchIterator.java | 4 +-
...ngBuildFirstReOpenableHashMatchIterator.java | 4 +-
.../ReusingBuildSecondHashMatchIterator.java | 4 +-
...gBuildSecondReOpenableHashMatchIterator.java | 4 +-
.../AbstractBlockResettableIterator.java | 6 +-
.../BlockResettableMutableObjectIterator.java | 4 +-
.../NonReusingBlockResettableIterator.java | 4 +-
.../ReusingBlockResettableIterator.java | 4 +-
.../resettable/SpillingResettableIterator.java | 6 +-
...SpillingResettableMutableObjectIterator.java | 6 +-
.../sort/AbstractMergeInnerJoinIterator.java | 4 +-
.../operators/sort/AbstractMergeIterator.java | 4 +-
.../sort/AbstractMergeOuterJoinIterator.java | 4 +-
.../sort/CombiningUnilateralSortMerger.java | 4 +-
.../operators/sort/FixedLengthRecordSorter.java | 12 +-
.../operators/sort/LargeRecordHandler.java | 2 +-
.../sort/NonReusingMergeInnerJoinIterator.java | 4 +-
.../sort/NonReusingMergeOuterJoinIterator.java | 4 +-
.../operators/sort/NormalizedKeySorter.java | 18 +-
.../sort/ReusingMergeInnerJoinIterator.java | 4 +-
.../sort/ReusingMergeOuterJoinIterator.java | 4 +-
.../operators/sort/UnilateralSortMerger.java | 4 +-
.../operators/util/CoGroupTaskIterator.java | 2 +-
.../operators/util/JoinTaskIterator.java | 2 +-
.../runtime/taskmanager/RuntimeEnvironment.java | 2 +-
.../apache/flink/runtime/taskmanager/Task.java | 2 +-
.../runtime/util/EnvironmentInformation.java | 19 +-
.../NetworkEnvironmentConfiguration.scala | 4 +-
.../flink/runtime/taskmanager/TaskManager.scala | 99 +-
.../flink/runtime/io/disk/ChannelViewsTest.java | 6 +-
.../io/disk/FileChannelStreamsITCase.java | 7 +-
.../runtime/io/disk/FileChannelStreamsTest.java | 8 +-
.../disk/SeekableFileChannelInputViewTest.java | 24 +-
.../runtime/io/disk/SpillingBufferTest.java | 31 +-
.../AsynchronousFileIOChannelTest.java | 8 +-
.../BufferFileWriterFileSegmentReaderTest.java | 7 +-
.../iomanager/BufferFileWriterReaderTest.java | 4 +-
.../io/disk/iomanager/IOManagerAsyncTest.java | 5 +-
.../io/disk/iomanager/IOManagerITCase.java | 8 +-
.../IOManagerPerformanceBenchmark.java | 6 +-
.../io/network/NetworkEnvironmentTest.java | 4 +-
.../api/serialization/PagedViewsTest.java | 15 +-
.../SpanningRecordSerializationTest.java | 4 +-
.../SpanningRecordSerializerTest.java | 7 +-
.../io/network/api/writer/RecordWriterTest.java | 9 +-
.../network/buffer/BufferPoolFactoryTest.java | 3 +-
.../runtime/io/network/buffer/BufferTest.java | 5 +-
.../io/network/buffer/LocalBufferPoolTest.java | 3 +-
.../network/buffer/NetworkBufferPoolTest.java | 5 +-
.../netty/NettyMessageSerializationTest.java | 4 +-
.../IteratorWrappingTestSingleInputGate.java | 4 +-
.../consumer/LocalInputChannelTest.java | 3 +-
.../partition/consumer/SingleInputGateTest.java | 5 +-
.../network/serialization/LargeRecordsTest.java | 6 +-
.../io/network/util/TestBufferFactory.java | 6 +-
.../memory/MemoryManagerLazyAllocationTest.java | 7 +-
.../flink/runtime/memory/MemoryManagerTest.java | 7 +-
.../runtime/memory/MemorySegmentSimpleTest.java | 576 ++++
.../memory/MemorySegmentSpeedBenchmark.java | 867 ------
.../flink/runtime/memory/MemorySegmentTest.java | 575 ----
.../operators/drivers/TestTaskContext.java | 7 +-
.../operators/hash/CompactingHashTableTest.java | 3 +-
.../runtime/operators/hash/HashTableITCase.java | 7 +-
.../hash/HashTablePerformanceComparison.java | 3 +-
.../runtime/operators/hash/HashTableTest.java | 3 +-
.../operators/hash/MemoryHashTableTest.java | 3 +-
.../MutableHashTablePerformanceBenchmark.java | 8 +-
.../hash/NonReusingHashMatchIteratorITCase.java | 5 +-
.../NonReusingReOpenableHashTableITCase.java | 8 +-
.../hash/ReusingHashMatchIteratorITCase.java | 5 +-
.../hash/ReusingReOpenableHashTableITCase.java | 8 +-
...lockResettableMutableObjectIteratorTest.java | 21 +-
.../NonReusingBlockResettableIteratorTest.java | 11 +-
.../ReusingBlockResettableIteratorTest.java | 13 +-
.../SpillingResettableIteratorTest.java | 7 +-
...lingResettableMutableObjectIteratorTest.java | 6 +-
...bstractSortMergeOuterJoinIteratorITCase.java | 8 +-
.../CombiningUnilateralSortMergerITCase.java | 7 +-
.../operators/sort/ExternalSortITCase.java | 5 +-
.../sort/ExternalSortLargeRecordsITCase.java | 5 +-
.../sort/FixedLengthRecordSorterTest.java | 14 +-
.../sort/LargeRecordHandlerITCase.java | 28 +-
.../operators/sort/LargeRecordHandlerTest.java | 21 +-
...ReusingSortMergeInnerJoinIteratorITCase.java | 17 +-
...ReusingSortMergeOuterJoinIteratorITCase.java | 2 +-
.../operators/sort/NormalizedKeySorterTest.java | 15 +-
...ReusingSortMergeInnerJoinIteratorITCase.java | 17 +-
...ReusingSortMergeOuterJoinIteratorITCase.java | 2 +-
.../testutils/BinaryOperatorTestBase.java | 5 +-
.../operators/testutils/DriverTestBase.java | 5 +-
.../operators/testutils/MockEnvironment.java | 9 +-
.../operators/testutils/TaskTestBase.java | 2 +-
.../testutils/UnaryOperatorTestBase.java | 5 +-
.../runtime/operators/util/BloomFilterTest.java | 5 +-
.../operators/util/HashVsSortMiniBenchmark.java | 6 +-
.../runtime/taskmanager/TaskAsyncCallTest.java | 2 +-
...askManagerComponentsStartupShutdownTest.java | 8 +-
.../flink/runtime/taskmanager/TaskTest.java | 2 +-
.../util/DataInputOutputSerializerTest.java | 4 +-
.../testingUtils/TestingTaskManager.scala | 4 +-
.../streaming/runtime/io/BufferSpiller.java | 3 +-
.../consumer/StreamTestSingleInputGate.java | 18 +-
.../io/BarrierBufferMassiveRandomTest.java | 5 +-
.../streaming/runtime/io/BarrierBufferTest.java | 8 +-
.../runtime/io/BarrierTrackerTest.java | 4 +-
.../streaming/runtime/io/BufferSpillerTest.java | 3 +-
.../runtime/io/StreamRecordWriterTest.java | 7 +-
.../runtime/tasks/StreamMockEnvironment.java | 11 +-
.../runtime/tasks/StreamTaskTestHarness.java | 2 +-
.../tez/runtime/TezRuntimeEnvironment.java | 11 +-
.../org/apache/flink/tez/runtime/TezTask.java | 2 +-
.../HashTableRecordWidthCombinations.java | 3 +-
.../flink/test/manual/MassiveStringSorting.java | 8 +-
.../test/manual/MassiveStringValueSorting.java | 19 +-
.../misc/MassiveCaseClassSortingITCase.scala | 4 +-
.../scala/runtime/CaseClassComparatorTest.scala | 30 +-
.../org/apache/flink/yarn/YarnTaskManager.scala | 4 +-
pom.xml | 2 +-
tools/maven/checkstyle.xml | 2 +-
201 files changed, 16226 insertions(+), 5096 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9ec71d2..bbaf71a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -125,9 +125,14 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";
/**
- * The key for the config parameter defining whether the memory manager allocates memory lazy.
+ * The fraction of off-heap memory relative to the heap size.
*/
- public static final String TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY = "taskmanager.memory.lazyalloc";
+ public static final String TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY = "taskmanager.memory.off-heap-ratio";
+
+ /**
+ * The config parameter defining the memory allocation method (JVM heap or off-heap).
+ */
+ public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap";
/**
* The config parameter defining the number of buffers used in the network stack. This defines the
@@ -543,6 +548,11 @@ public final class ConfigConstants {
public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;
/**
+ * The default ratio of heap to off-heap memory, when the TaskManager is started with off-heap memory.
+ */
+ public static final float DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO = 3.0f;
+
+ /**
* Default number of buffers used in the network stack.
*/
public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
new file mode 100644
index 0000000..0685d59
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * This class represents a piece of heap memory managed by Flink.
+ * The segment is backed by a byte array and features random put and get methods for the basic types,
+ * as well as compare and swap methods.
+ * <p>
+ * This class specialized byte access and byte copy calls for heap memory, while reusing the
+ * multi-byte type accesses and cross-segment operations from the MemorySegment.
+ * <p>
+ * Note that memory segments should usually not be allocated manually, but rather through the
+ * {@link MemorySegmentFactory}.
+ */
+public final class HeapMemorySegment extends MemorySegment {
+
+ /** An extra reference to the heap memory, so we can let byte array checks fail
+ * by the built-in checks automatically without extra checks */
+ private byte[] memory;
+
+ /**
+ * Creates a new memory segment that represents the data in the given byte array.
+ * The owner of this memory segment is null.
+ *
+ * @param memory The byte array that holds the data.
+ */
+ HeapMemorySegment(byte[] memory) {
+ this(memory, null);
+ }
+
+ /**
+ * Creates a new memory segment that represents the data in the given byte array.
+ * The memory segment references the given owner.
+ *
+ * @param memory The byte array that holds the data.
+ * @param owner The owner referenced by the memory segment.
+ */
+ HeapMemorySegment(byte[] memory, Object owner) {
+ super(Objects.requireNonNull(memory), owner);
+ this.memory = memory;
+ }
+
+ // -------------------------------------------------------------------------
+ // MemorySegment operations
+ // -------------------------------------------------------------------------
+
+ @Override
+ public void free() {
+ super.free();
+ this.memory = null;
+ }
+
+ @Override
+ public ByteBuffer wrap(int offset, int length) {
+ try {
+ return ByteBuffer.wrap(this.memory, offset, length);
+ }
+ catch (NullPointerException e) {
+ throw new IllegalStateException("segment has been freed");
+ }
+ }
+
+ /**
+ * Gets the byte array that backs this memory segment.
+ *
+ * @return The byte array that backs this memory segment, or null, if the segment has been freed.
+ */
+ public byte[] getArray() {
+ return this.heapMemory;
+ }
+
+ // ------------------------------------------------------------------------
+ // Random Access get() and put() methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public final byte get(int index) {
+ return this.memory[index];
+ }
+
+ @Override
+ public final void put(int index, byte b) {
+ this.memory[index] = b;
+ }
+
+ @Override
+ public final void get(int index, byte[] dst) {
+ get(index, dst, 0, dst.length);
+ }
+
+ @Override
+ public final void put(int index, byte[] src) {
+ put(index, src, 0, src.length);
+ }
+
+ @Override
+ public final void get(int index, byte[] dst, int offset, int length) {
+ // system arraycopy does the boundary checks anyways, no need to check extra
+ System.arraycopy(this.memory, index, dst, offset, length);
+ }
+
+ @Override
+ public final void put(int index, byte[] src, int offset, int length) {
+ // system arraycopy does the boundary checks anyways, no need to check extra
+ System.arraycopy(src, offset, this.memory, index, length);
+ }
+
+ @Override
+ public final boolean getBoolean(int index) {
+ return this.memory[index] != 0;
+ }
+
+ @Override
+ public final void putBoolean(int index, boolean value) {
+ this.memory[index] = (byte) (value ? 1 : 0);
+ }
+
+ // -------------------------------------------------------------------------
+ // Bulk Read and Write Methods
+ // -------------------------------------------------------------------------
+
+ @Override
+ public final void get(DataOutput out, int offset, int length) throws IOException {
+ out.write(this.memory, offset, length);
+ }
+
+ @Override
+ public final void put(DataInput in, int offset, int length) throws IOException {
+ in.readFully(this.memory, offset, length);
+ }
+
+ @Override
+ public final void get(int offset, ByteBuffer target, int numBytes) {
+ // ByteBuffer performs the boundary checks
+ target.put(this.memory, offset, numBytes);
+ }
+
+ @Override
+ public final void put(int offset, ByteBuffer source, int numBytes) {
+ // ByteBuffer performs the boundary checks
+ source.get(this.memory, offset, numBytes);
+ }
+
+ // -------------------------------------------------------------------------
+ // Factoring
+ // -------------------------------------------------------------------------
+
+ /**
+ * A memory segment factory that produces heap memory segments. Note that this factory does not
+ * support to allocate off-heap memory.
+ */
+ public static final class HeapMemorySegmentFactory implements MemorySegmentFactory.Factory {
+
+ @Override
+ public HeapMemorySegment wrap(byte[] memory) {
+ return new HeapMemorySegment(memory);
+ }
+
+ @Override
+ public HeapMemorySegment allocateUnpooledSegment(int size, Object owner) {
+ return new HeapMemorySegment(new byte[size], owner);
+ }
+
+ @Override
+ public HeapMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
+ return new HeapMemorySegment(memory, owner);
+ }
+
+ @Override
+ public HeapMemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner) {
+ throw new UnsupportedOperationException(
+ "The MemorySegment factory was not initialized for off-heap memory.");
+ }
+
+ /** prevent external instantiation */
+ HeapMemorySegmentFactory() {}
+ };
+
+ public static final HeapMemorySegmentFactory FACTORY = new HeapMemorySegmentFactory();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
new file mode 100644
index 0000000..f68723b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * This class represents a piece of memory managed by Flink. The memory can be on-heap or off-heap,
+ * this is transparently handled by this class.
+ * <p>
+ * This class specialized byte access and byte copy calls for heap memory, while reusing the
+ * multi-byte type accesses and cross-segment operations from the MemorySegment.
+ * <p>
+ * This class subsumes the functionality of the {@link org.apache.flink.core.memory.HeapMemorySegment},
+ * but is a bit less efficient for operations on individual bytes.
+ * <p>
+ * Note that memory segments should usually not be allocated manually, but rather through the
+ * {@link MemorySegmentFactory}.
+ */
+public final class HybridMemorySegment extends MemorySegment {
+
+ /** The direct byte buffer that allocated the off-heap memory. This memory segment holds a reference
+ * to that buffer, so as long as this memory segment lives, the memory will not be released. */
+ private final ByteBuffer offHeapBuffer;
+
+ /**
+ * Creates a new memory segment that represents the memory backing the given direct byte buffer.
+ * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
+ * otherwise this method with throw an IllegalArgumentException.
+ * <p>
+ * The owner referenced by this memory segment is null.
+ *
+ * @param buffer The byte buffer whose memory is represented by this memory segment.
+ * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
+ */
+ HybridMemorySegment(ByteBuffer buffer) {
+ this(buffer, null);
+ }
+
+ /**
+ * Creates a new memory segment that represents the memory backing the given direct byte buffer.
+ * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
+ * otherwise this method with throw an IllegalArgumentException.
+ * <p>
+ * The memory segment references the given owner.
+ *
+ * @param buffer The byte buffer whose memory is represented by this memory segment.
+ * @param owner The owner references by this memory segment.
+ * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
+ */
+ HybridMemorySegment(ByteBuffer buffer, Object owner) {
+ super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
+ this.offHeapBuffer = buffer;
+ }
+
+ /**
+ * Creates a new memory segment that represents the memory of the byte array.
+ * <p>
+ * The owner referenced by this memory segment is null.
+ *
+ * @param buffer The byte array whose memory is represented by this memory segment.
+ */
+ HybridMemorySegment(byte[] buffer) {
+ this(buffer, null);
+ }
+
+ /**
+ * Creates a new memory segment that represents the memory of the byte array.
+ * <p>
+ * The memory segment references the given owner.
+ *
+ * @param buffer The byte array whose memory is represented by this memory segment.
+ * @param owner The owner references by this memory segment.
+ */
+ HybridMemorySegment(byte[] buffer, Object owner) {
+ super(buffer, owner);
+ this.offHeapBuffer = null;
+ }
+
+ // -------------------------------------------------------------------------
+ // MemorySegment operations
+ // -------------------------------------------------------------------------
+
+ public byte[] getArray() {
+ if (heapMemory != null) {
+ return heapMemory;
+ } else {
+ throw new IllegalStateException("Memory segment does not represent heap memory");
+ }
+ }
+
+ /**
+ * Gets the buffer that owns the memory of this memory segment.
+ *
+ * @return The byte buffer that owns the memory of this memory segment.
+ */
+ public ByteBuffer getOffHeapBuffer() {
+ if (offHeapBuffer != null) {
+ return offHeapBuffer;
+ } else {
+ throw new IllegalStateException("Memory segment does not represent off heap memory");
+ }
+ }
+
+ @Override
+ public ByteBuffer wrap(int offset, int length) {
+ if (address <= addressLimit) {
+ if (heapMemory != null) {
+ return ByteBuffer.wrap(heapMemory, offset, length);
+ }
+ else {
+ try {
+ ByteBuffer wrapper = offHeapBuffer.duplicate();
+ wrapper.limit(offset + length);
+ wrapper.position(offset);
+ return wrapper;
+ }
+ catch (IllegalArgumentException e) {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+ }
+ else {
+ throw new IllegalStateException("segment has been freed");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Random Access get() and put() methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public byte get(int index) {
+ final long pos = address + index;
+ if (index >= 0 && pos < addressLimit) {
+ return UNSAFE.getByte(heapMemory, pos);
+ }
+ else if (address > addressLimit) {
+ throw new IllegalStateException("segment has been freed");
+ }
+ else {
+ // index is in fact invalid
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ @Override
+ public void put(int index, byte b) {
+ final long pos = address + index;
+ if (index >= 0 && pos < addressLimit) {
+ UNSAFE.putByte(heapMemory, pos, b);
+ }
+ else if (address > addressLimit) {
+ throw new IllegalStateException("segment has been freed");
+ }
+ else {
+ // index is in fact invalid
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ @Override
+ public void get(int index, byte[] dst) {
+ get(index, dst, 0, dst.length);
+ }
+
+ @Override
+ public void put(int index, byte[] src) {
+ put(index, src, 0, src.length);
+ }
+
+ @Override
+ public void get(int index, byte[] dst, int offset, int length) {
+ // check the byte array offset and length and the status
+ if ( (offset | length | (offset + length) | (dst.length - (offset + length))) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ final long pos = address + index;
+ if (index >= 0 && pos <= addressLimit - length) {
+ final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
+ UNSAFE.copyMemory(heapMemory, pos, dst, arrayAddress, length);
+ }
+ else if (address > addressLimit) {
+ throw new IllegalStateException("segment has been freed");
+ }
+ else {
+ // index is in fact invalid
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ @Override
+ public void put(int index, byte[] src, int offset, int length) {
+ // check the byte array offset and length
+ if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ final long pos = address + index;
+
+ if (index >= 0 && pos <= addressLimit - length) {
+ final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
+ UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, length);
+ }
+ else if (address > addressLimit) {
+ throw new IllegalStateException("segment has been freed");
+ }
+ else {
+ // index is in fact invalid
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ @Override
+ public boolean getBoolean(int index) {
+ return get(index) != 0;
+ }
+
+ @Override
+ public void putBoolean(int index, boolean value) {
+ put(index, (byte) (value ? 1 : 0));
+ }
+
+ // -------------------------------------------------------------------------
+ // Bulk Read and Write Methods
+ // -------------------------------------------------------------------------
+
+ @Override
+ public final void get(DataOutput out, int offset, int length) throws IOException {
+ if (address <= addressLimit) {
+ if (heapMemory != null) {
+ out.write(heapMemory, offset, length);
+ }
+ else {
+ while (length >= 8) {
+ out.writeLong(getLongBigEndian(offset));
+ offset += 8;
+ length -= 8;
+ }
+
+ while (length > 0) {
+ out.writeByte(get(offset));
+ offset++;
+ length--;
+ }
+ }
+ }
+ else {
+ throw new IllegalStateException("segment has been freed");
+ }
+ }
+
+ @Override
+ public final void put(DataInput in, int offset, int length) throws IOException {
+ if (address <= addressLimit) {
+ if (heapMemory != null) {
+ in.readFully(heapMemory, offset, length);
+ }
+ else {
+ while (length >= 8) {
+ putLongBigEndian(offset, in.readLong());
+ offset += 8;
+ length -= 8;
+ }
+ while (length > 0) {
+ put(offset, in.readByte());
+ offset++;
+ length--;
+ }
+ }
+ }
+ else {
+ throw new IllegalStateException("segment has been freed");
+ }
+ }
+
+ @Override
+ public final void get(int offset, ByteBuffer target, int numBytes) {
+ // check the byte array offset and length
+ if ((offset | numBytes | (offset + numBytes)) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ final int targetOffset = target.position();
+ final int remaining = target.remaining();
+
+ if (remaining < numBytes) {
+ throw new BufferOverflowException();
+ }
+
+ if (target.isDirect()) {
+ // copy to the target memory directly
+ final long targetPointer = getAddress(target) + targetOffset;
+ final long sourcePointer = address + offset;
+
+ if (sourcePointer <= addressLimit - numBytes) {
+ UNSAFE.copyMemory(heapMemory, sourcePointer, null, targetPointer, numBytes);
+ target.position(targetOffset + numBytes);
+ }
+ else if (address > addressLimit) {
+ throw new IllegalStateException("segment has been freed");
+ }
+ else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+ else if (target.hasArray()) {
+ // move directly into the byte array
+ get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes);
+
+ // this must be after the get() call to ensue that the byte buffer is not
+ // modified in case the call fails
+ target.position(targetOffset + numBytes);
+ }
+ else {
+ // neither heap buffer nor direct buffer
+ while (target.hasRemaining()) {
+ target.put(get(offset++));
+ }
+ }
+ }
+
+ @Override
+ public final void put(int offset, ByteBuffer source, int numBytes) {
+ // check the byte array offset and length
+ if ((offset | numBytes | (offset + numBytes)) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ final int sourceOffset = source.position();
+ final int remaining = source.remaining();
+
+ if (remaining < numBytes) {
+ throw new BufferUnderflowException();
+ }
+
+ if (source.isDirect()) {
+ // copy to the target memory directly
+ final long sourcePointer = getAddress(source) + sourceOffset;
+ final long targetPointer = address + offset;
+
+ if (targetPointer <= addressLimit - numBytes) {
+ UNSAFE.copyMemory(null, sourcePointer, heapMemory, targetPointer, numBytes);
+ source.position(sourceOffset + numBytes);
+ }
+ else if (address > addressLimit) {
+ throw new IllegalStateException("segment has been freed");
+ }
+ else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+ else if (source.hasArray()) {
+ // move directly into the byte array
+ put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes);
+
+ // this must be after the get() call to ensue that the byte buffer is not
+ // modified in case the call fails
+ source.position(sourceOffset + numBytes);
+ }
+ else {
+ // neither heap buffer nor direct buffer
+ while (source.hasRemaining()) {
+ put(offset++, source.get());
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities for native memory accesses and checks
+ // --------------------------------------------------------------------------------------------
+
+ /** The reflection fields with which we access the off-heap pointer from direct ByteBuffers */
+ private static final Field ADDRESS_FIELD;
+
+ static {
+ try {
+ ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");
+ ADDRESS_FIELD.setAccessible(true);
+ }
+ catch (Throwable t) {
+ throw new RuntimeException(
+ "Cannot initialize HybridMemorySegment: off-heap memory is incompatible with this JVM.", t);
+ }
+ }
+
+ private static long getAddress(ByteBuffer buffer) {
+ if (buffer == null) {
+ throw new NullPointerException("buffer is null");
+ }
+ try {
+ return (Long) ADDRESS_FIELD.get(buffer);
+ }
+ catch (Throwable t) {
+ throw new RuntimeException("Could not access direct byte buffer address.", t);
+ }
+ }
+
+ private static long checkBufferAndGetAddress(ByteBuffer buffer) {
+ if (buffer == null) {
+ throw new NullPointerException("buffer is null");
+ }
+ if (!buffer.isDirect()) {
+ throw new IllegalArgumentException("Can't initialize from non-direct ByteBuffer.");
+ }
+ return getAddress(buffer);
+ }
+
+ // -------------------------------------------------------------------------
+ // Factoring
+ // -------------------------------------------------------------------------
+
+ /**
+ * Base factory for hybrid memory segments.
+ */
+ public static final class HybridMemorySegmentFactory implements MemorySegmentFactory.Factory {
+
+ @Override
+ public HybridMemorySegment wrap(byte[] memory) {
+ return new HybridMemorySegment(memory);
+ }
+
+ @Override
+ public HybridMemorySegment allocateUnpooledSegment(int size, Object owner) {
+ return new HybridMemorySegment(new byte[size], owner);
+ }
+
+ @Override
+ public HybridMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
+ return new HybridMemorySegment(memory, owner);
+ }
+
+ @Override
+ public HybridMemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner) {
+ return new HybridMemorySegment(memory, owner);
+ }
+
+ /** prevent external instantiation */
+ HybridMemorySegmentFactory() {}
+ };
+
+ public static final HybridMemorySegmentFactory FACTORY = new HybridMemorySegmentFactory();
+}