You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/08 20:58:56 UTC
[06/15] flink git commit: [FLINK-1320] [core] Add an off-heap variant
of the managed memory
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 529d3d1..f11b933 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,9 +37,10 @@ import com.fasterxml.jackson.databind.ObjectMapper
import grizzled.slf4j.Logger
import org.apache.flink.configuration._
-
+import org.apache.flink.core.memory.{HybridMemorySegment, HeapMemorySegment, MemorySegmentFactory, MemoryType}
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
+import org.apache.flink.runtime.memory.MemoryManager.HeapMemoryPool
import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode}
@@ -50,14 +51,13 @@ import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, Ta
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.filecache.FileCache
-import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription,
-InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceConnectionInfo, InstanceID}
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
-import org.apache.flink.runtime.memorymanager.{MemoryManager, DefaultMemoryManager}
+import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.Messages._
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages._
@@ -136,7 +136,7 @@ class TaskManager(
protected val askTimeout = new Timeout(config.timeout)
/** The TaskManager's physical execution resources */
- protected val resources = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
+ protected val resources = HardwareDescription.extractFromSystem(memoryManager.getMemorySize())
/** Registry of all tasks currently executed by this TaskManager */
protected val runningTasks = new java.util.HashMap[ExecutionAttemptID, Task]()
@@ -1548,7 +1548,8 @@ object TaskManager {
val (taskManagerConfig : TaskManagerConfiguration,
netConfig: NetworkEnvironmentConfiguration,
- connectionInfo: InstanceConnectionInfo
+ connectionInfo: InstanceConnectionInfo,
+ memType: MemoryType
) = parseTaskManagerConfiguration(
configuration,
taskManagerHostname,
@@ -1577,7 +1578,7 @@ object TaskManager {
LOG.info(s"Using $configuredMemory MB for Flink managed memory.")
configuredMemory << 20 // megabytes to bytes
}
- else {
+ else if (memType == MemoryType.HEAP) {
val fraction = configuration.getFloat(
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
@@ -1589,7 +1590,24 @@ object TaskManager {
fraction).toLong
LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
- s"memory (${relativeMemSize >> 20} MB).")
+ s" heap memory (${relativeMemSize >> 20} MB).")
+
+ relativeMemSize
+ }
+ else {
+ val ratio = configuration.getFloat(
+ ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
+ ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
+
+ checkConfigParameter(ratio > 0.0f,
+ ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
+ "MemoryManager ratio (off-heap memory / heap size) must be larger than zero")
+
+ val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
+ val relativeMemSize = (maxHeapSize * ratio).toLong
+
+ LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " +
+ s"managed off-heap memory (${relativeMemSize >> 20} MB).")
relativeMemSize
}
@@ -1598,16 +1616,27 @@ object TaskManager {
// now start the memory manager
val memoryManager = try {
- new DefaultMemoryManager(
+ new MemoryManager(
memorySize,
taskManagerConfig.numberOfSlots,
netConfig.networkBufferSize,
+ memType,
preAllocateMemory)
}
catch {
- case e: OutOfMemoryError => throw new Exception(
- "OutOfMemory error (" + e.getMessage + ") while allocating the TaskManager memory (" +
- memorySize + " bytes).", e)
+ case e: OutOfMemoryError =>
+ memType match {
+ case MemoryType.HEAP =>
+ throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
+ s" while allocating the TaskManager heap memory (${memorySize} bytes).", e)
+
+ case MemoryType.OFF_HEAP =>
+ throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
+ s" while allocating the TaskManager off-heap memory (${memorySize} bytes). " +
+ s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
+
+ case _ => throw e
+ }
}
// start the I/O manager last, it will create some temp directories.
@@ -1692,7 +1721,8 @@ object TaskManager {
localTaskManagerCommunication: Boolean)
: (TaskManagerConfiguration,
NetworkEnvironmentConfiguration,
- InstanceConnectionInfo) = {
+ InstanceConnectionInfo,
+ MemoryType) = {
// ------- read values from the config and check them ---------
// (a lot of them)
@@ -1738,9 +1768,9 @@ object TaskManager {
val pageSize: Int =
if (pageSizeNew != -1) {
// new page size has been configured
- checkConfigParameter(pageSizeNew >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSizeNew,
+ checkConfigParameter(pageSizeNew >= MemoryManager.MIN_PAGE_SIZE, pageSizeNew,
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
- "Minimum memory segment size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
+ "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE)
checkConfigParameter(MathUtils.isPowerOf2(pageSizeNew), pageSizeNew,
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
@@ -1754,9 +1784,9 @@ object TaskManager {
}
else {
// old page size has been configured
- checkConfigParameter(pageSizeOld >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSizeOld,
+ checkConfigParameter(pageSizeOld >= MemoryManager.MIN_PAGE_SIZE, pageSizeOld,
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
- "Minimum buffer size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
+ "Minimum buffer size is " + MemoryManager.MIN_PAGE_SIZE)
checkConfigParameter(MathUtils.isPowerOf2(pageSizeOld), pageSizeOld,
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
@@ -1765,6 +1795,35 @@ object TaskManager {
pageSizeOld
}
+ // check whether we use heap or off-heap memory
+ val memType: MemoryType =
+ if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
+ MemoryType.OFF_HEAP
+ } else {
+ MemoryType.HEAP
+ }
+
+ // initialize the memory segment factory accordingly
+ memType match {
+ case MemoryType.HEAP =>
+ if (!MemorySegmentFactory.isInitialized()) {
+ MemorySegmentFactory.initializeFactory(HeapMemorySegment.FACTORY)
+ }
+ else if (MemorySegmentFactory.getFactory() != HeapMemorySegment.FACTORY) {
+ throw new Exception("Memory type is set to heap memory, but memory segment " +
+ "factory has been initialized for off-heap memory segments")
+ }
+
+ case MemoryType.OFF_HEAP =>
+ if (!MemorySegmentFactory.isInitialized()) {
+ MemorySegmentFactory.initializeFactory(HybridMemorySegment.FACTORY)
+ }
+ else if (MemorySegmentFactory.getFactory() != HybridMemorySegment.FACTORY) {
+ throw new Exception("Memory type is set to off-heap memory, but memory segment " +
+ "factory has been initialized for heap memory segments")
+ }
+ }
+
val tmpDirs = configuration.getString(
ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)
@@ -1783,7 +1842,8 @@ object TaskManager {
}
// Default spill I/O mode for intermediate results
- val syncOrAsync = configuration.getString(ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+ val syncOrAsync = configuration.getString(
+ ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE)
val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
@@ -1791,6 +1851,7 @@ object TaskManager {
val networkConfig = NetworkEnvironmentConfiguration(
numNetworkBuffers,
pageSize,
+ memType,
ioMode,
nettyConfig)
@@ -1834,7 +1895,7 @@ object TaskManager {
slots,
configuration)
- (taskManagerConfig, networkConfig, connectionInfo)
+ (taskManagerConfig, networkConfig, connectionInfo, memType)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index f2a5e2d..a44916a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.io.disk;
import java.io.EOFException;
import java.util.List;
+import org.apache.flink.core.memory.MemoryType;
import org.junit.Assert;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
@@ -32,8 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.TestData.Key;
@@ -78,7 +78,7 @@ public class ChannelViewsTest
@Before
public void beforeTest() {
- this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, true);
+ this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, MemoryType.HEAP, true);
this.ioManager = new IOManagerAsync();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index dcc1e5f..5deb50e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.disk;
import static org.junit.Assert.*;
+import org.apache.flink.core.memory.MemoryType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -29,8 +30,7 @@ import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.PairGenerator;
import org.apache.flink.runtime.operators.testutils.PairGenerator.KeyMode;
@@ -66,7 +66,8 @@ public class FileChannelStreamsITCase {
@Before
public void beforeTest() {
- memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE, true);
+ memManager = new MemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1,
+ MEMORY_PAGE_SIZE, MemoryType.HEAP, true);
ioManager = new IOManagerAsync();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index f9b8b38..1c2b3de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -26,13 +26,13 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.types.StringValue;
import org.junit.Test;
@@ -44,7 +44,7 @@ public class FileChannelStreamsTest {
public void testCloseAndDeleteOutputView() {
final IOManager ioManager = new IOManagerAsync();
try {
- MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024, true);
+ MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
@@ -78,7 +78,7 @@ public class FileChannelStreamsTest {
public void testCloseAndDeleteInputView() {
final IOManager ioManager = new IOManagerAsync();
try {
- MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024, true);
+ MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
index c071bef..4c6a2b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -25,12 +25,12 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.junit.Test;
@@ -45,7 +45,7 @@ public class SeekableFileChannelInputViewTest {
// integers across 7.x pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes)
try {
- MemoryManager memMan = new DefaultMemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE, true);
+ MemoryManager memMan = new MemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
@@ -71,7 +71,7 @@ public class SeekableFileChannelInputViewTest {
try {
in.readInt();
fail("should throw EOF exception");
- } catch (EOFException e) {}
+ } catch (EOFException ignored) {}
// seek to the middle of the 3rd page
int i = 2 * PAGE_SIZE + PAGE_SIZE / 4;
@@ -82,7 +82,7 @@ public class SeekableFileChannelInputViewTest {
try {
in.readInt();
fail("should throw EOF exception");
- } catch (EOFException e) {}
+ } catch (EOFException ignored) {}
// seek to the end
i = 120000 - 4;
@@ -93,7 +93,7 @@ public class SeekableFileChannelInputViewTest {
try {
in.readInt();
fail("should throw EOF exception");
- } catch (EOFException e) {}
+ } catch (EOFException ignored) {}
// seek to the beginning
i = 0;
@@ -104,7 +104,7 @@ public class SeekableFileChannelInputViewTest {
try {
in.readInt();
fail("should throw EOF exception");
- } catch (EOFException e) {}
+ } catch (EOFException ignored) {}
// seek to after a page
i = PAGE_SIZE;
@@ -115,7 +115,7 @@ public class SeekableFileChannelInputViewTest {
try {
in.readInt();
fail("should throw EOF exception");
- } catch (EOFException e) {}
+ } catch (EOFException ignored) {}
// seek to after a page
i = 3 * PAGE_SIZE;
@@ -126,7 +126,7 @@ public class SeekableFileChannelInputViewTest {
try {
in.readInt();
fail("should throw EOF exception");
- } catch (EOFException e) {}
+ } catch (EOFException ignored) {}
// seek to the end
i = NUM_RECORDS;
@@ -134,17 +134,17 @@ public class SeekableFileChannelInputViewTest {
try {
in.readInt();
fail("should throw EOF exception");
- } catch (EOFException e) {}
+ } catch (EOFException ignored) {}
// seek out of bounds
try {
in.seek(-10);
fail("should throw an exception");
- } catch (IllegalArgumentException e) {}
+ } catch (IllegalArgumentException ignored) {}
try {
in.seek(NUM_RECORDS + 1);
fail("should throw an exception");
- } catch (IllegalArgumentException e) {}
+ } catch (IllegalArgumentException ignored) {}
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 6a9a20a..0b1e0c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk;
import org.apache.flink.core.memory.DataInputView;
@@ -24,9 +23,8 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.ListMemorySegmentSource;
+import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
@@ -34,6 +32,7 @@ import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode
import org.apache.flink.runtime.operators.testutils.TestData.Key;
import org.apache.flink.runtime.operators.testutils.TestData.Value;
import org.apache.flink.types.Record;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -42,7 +41,6 @@ import org.junit.Test;
import java.io.EOFException;
import java.util.ArrayList;
-
public class SpillingBufferTest {
private static final long SEED = 649180756312423613L;
@@ -69,7 +67,7 @@ public class SpillingBufferTest {
@Before
public void beforeTest() {
- memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+ memoryManager = new MemoryManager(MEMORY_SIZE, 1);
ioManager = new IOManagerAsync();
}
@@ -91,8 +89,7 @@ public class SpillingBufferTest {
// --------------------------------------------------------------------------------------------
@Test
- public void testWriteReadInMemory() throws Exception
- {
+ public void testWriteReadInMemory() throws Exception {
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
// create the writer output view
@@ -150,9 +147,9 @@ public class SpillingBufferTest {
}
@Test
- public void testWriteReadTooMuchInMemory() throws Exception
- {
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ public void testWriteReadTooMuchInMemory() throws Exception {
+ final TestData.Generator generator = new TestData.Generator(
+ SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
// create the writer output view
final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -217,9 +214,9 @@ public class SpillingBufferTest {
// --------------------------------------------------------------------------------------------
@Test
- public void testWriteReadExternal() throws Exception
- {
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ public void testWriteReadExternal() throws Exception {
+ final TestData.Generator generator = new TestData.Generator(
+ SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
// create the writer output view
final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -276,9 +273,9 @@ public class SpillingBufferTest {
}
@Test
- public void testWriteReadTooMuchExternal() throws Exception
- {
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ public void testWriteReadTooMuchExternal() throws Exception {
+ final TestData.Generator generator = new TestData.Generator(
+ SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
// create the writer output view
final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
index 49e93c6..a471e66 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.io.disk.iomanager;
+import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.junit.Test;
@@ -274,7 +276,7 @@ public class AsynchronousFileIOChannelTest {
try {
final int NUM_BLOCKS = 100;
- final MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
+ final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
final AtomicInteger callbackCounter = new AtomicInteger();
final AtomicBoolean exceptionOccurred = new AtomicBoolean();
@@ -336,7 +338,7 @@ public class AsynchronousFileIOChannelTest {
private void testExceptionForwardsToClose(IOManagerAsync ioMan, final int numBlocks, final int failingBlock) {
try {
- MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
+ MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
FileIOChannel.ID channelId = ioMan.createChannel();
BlockChannelWriterWithCallback<MemorySegment> writer = new AsynchronousBlockWriterWithCallback(channelId,
@@ -371,7 +373,7 @@ public class AsynchronousFileIOChannelTest {
finally {
try {
writer.closeAndDelete();
- } catch (Throwable t) {}
+ } catch (Throwable ignored) {}
}
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
index 294a6e6..c1bd465 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.disk.iomanager;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.testutils.DiscardingRecycler;
@@ -146,7 +147,9 @@ public class BufferFileWriterFileSegmentReaderTest {
fileSegment.getFileChannel().read(buffer, fileSegment.getPosition());
- currentNumber = verifyBufferFilledWithAscendingNumbers(new Buffer(new MemorySegment(buffer.array()), BUFFER_RECYCLER), currentNumber, fileSegment.getLength());
+ currentNumber = verifyBufferFilledWithAscendingNumbers(
+ new Buffer(MemorySegmentFactory.wrap(buffer.array()), BUFFER_RECYCLER),
+ currentNumber, fileSegment.getLength());
}
reader.close();
@@ -169,7 +172,7 @@ public class BufferFileWriterFileSegmentReaderTest {
}
private Buffer createBuffer() {
- return new Buffer(new MemorySegment(new byte[BUFFER_SIZE]), BUFFER_RECYCLER);
+ return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), BUFFER_RECYCLER);
}
public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index b0c702a..24d2864 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -19,9 +19,11 @@
package org.apache.flink.runtime.io.disk.iomanager;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.testutils.DiscardingRecycler;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -194,7 +196,7 @@ public class BufferFileWriterReaderTest {
}
private Buffer createBuffer() {
- return new Buffer(new MemorySegment(new byte[BUFFER_SIZE]), BUFFER_RECYCLER);
+ return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), BUFFER_RECYCLER);
}
public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
index 435588f..4656d56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.disk.iomanager;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -62,7 +63,7 @@ public class IOManagerAsyncTest {
final FileIOChannel.ID channelID = this.ioManager.createChannel();
final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channelID);
- MemorySegment memSeg = new MemorySegment(new byte[32 * 1024]);
+ MemorySegment memSeg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
for (int i = 0; i < NUM_IOS; i++) {
for (int pos = 0; pos < memSeg.size(); pos += 4) {
@@ -103,7 +104,7 @@ public class IOManagerAsyncTest {
try {
final List<MemorySegment> memSegs = new ArrayList<MemorySegment>();
for (int i = 0; i < NUM_SEGS; i++) {
- memSegs.add(new MemorySegment(new byte[32 * 1024]));
+ memSegs.add(MemorySegmentFactory.allocateUnpooledSegment(32 * 1024));
}
final FileIOChannel.ID channelID = this.ioManager.createChannel();
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 52908d3..6c25117 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -35,7 +35,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
/**
* Integration test case for the I/O manager.
@@ -54,11 +54,11 @@ public class IOManagerITCase {
private IOManager ioManager;
- private DefaultMemoryManager memoryManager;
+ private MemoryManager memoryManager;
@Before
public void beforeTest() {
- memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+ memoryManager = new MemoryManager(MEMORY_SIZE, 1);
ioManager = new IOManagerAsync();
}
@@ -209,7 +209,7 @@ public class IOManagerITCase {
}
}
- private static final int skewedSample(Random rnd, int max) {
+ private static int skewedSample(Random rnd, int max) {
double uniform = rnd.nextDouble();
double var = Math.pow(uniform, 8.0);
double pareto = 0.2 / var;
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
index 3bdc9bd..fd02623 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -42,7 +42,7 @@ import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
import org.junit.After;
import org.junit.Before;
@@ -66,14 +66,14 @@ public class IOManagerPerformanceBenchmark {
private static final AbstractInvokable memoryOwner = new DummyInvokable();
- private DefaultMemoryManager memManager;
+ private MemoryManager memManager;
private IOManager ioManager;
@Before
public void startup() {
- memManager = new DefaultMemoryManager(MEMORY_SIZE,1);
+ memManager = new MemoryManager(MEMORY_SIZE, 1);
ioManager = new IOManagerAsync();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index d657ebf..420199c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network;
import static org.junit.Assert.*;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -55,7 +56,8 @@ public class NetworkEnvironmentTest {
try {
NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, new Configuration());
NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
- NUM_BUFFERS, BUFFER_SIZE, IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf),
+ NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP,
+ IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf),
new Tuple2<Integer, Integer>(0, 0));
NetworkEnvironment env = new NetworkEnvironment(
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
index e7d0524..8455402 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
@@ -16,20 +16,23 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.runtime.io.network.api.serialization.types.Util;
-import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
-import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+
import org.junit.Test;
import java.io.EOFException;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -375,7 +378,7 @@ public class PagedViewsTest {
private final int segmentSize;
private TestOutputView(int segmentSize) {
- super(new MemorySegment(new byte[segmentSize]), segmentSize, 0);
+ super(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), segmentSize, 0);
this.segmentSize = segmentSize;
}
@@ -383,7 +386,7 @@ public class PagedViewsTest {
@Override
protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException {
segments.add(new SegmentWithPosition(current, positionInCurrent));
- return new MemorySegment(new byte[segmentSize]);
+ return MemorySegmentFactory.allocateUnpooledSegment(segmentSize);
}
public void close() {
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index cd6d580..819a94f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -19,11 +19,13 @@
package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.runtime.io.network.api.serialization.types.Util;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
import org.junit.Assert;
import org.junit.Test;
@@ -125,7 +127,7 @@ public class SpanningRecordSerializationTest {
{
final int SERIALIZATION_OVERHEAD = 4; // length encoding
- final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), mock(BufferRecycler.class));
+ final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), mock(BufferRecycler.class));
final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
index 50d3639..b7bcb3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.runtime.io.network.api.serialization.types.Util;
@@ -41,7 +42,7 @@ public class SpanningRecordSerializerTest {
final int SEGMENT_SIZE = 16;
final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
- final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
+ final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
Assert.assertFalse(serializer.hasData());
@@ -75,7 +76,7 @@ public class SpanningRecordSerializerTest {
final int SEGMENT_SIZE = 11;
final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
- final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
+ final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
try {
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.setNextBuffer(buffer));
@@ -201,7 +202,7 @@ public class SpanningRecordSerializerTest {
final int SERIALIZATION_OVERHEAD = 4; // length encoding
final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
- final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), mock(BufferRecycler.class));
+ final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), mock(BufferRecycler.class));
// -------------------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 9e10582..f8cd28f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.api.writer;
+import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
@@ -25,9 +26,11 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.types.IntValue;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -41,14 +44,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static com.google.common.base.Preconditions.checkNotNull;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -158,7 +159,7 @@ public class RecordWriterTest {
BufferPool bufferPool = null;
try {
- buffers = new NetworkBufferPool(1, 1024);
+ buffers = new NetworkBufferPool(1, 1024, MemoryType.HEAP);
bufferPool = spy(buffers.createBufferPool(1, true));
ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 28862e8..0ac84dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.buffer;
+import org.apache.flink.core.memory.MemoryType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -40,7 +41,7 @@ public class BufferPoolFactoryTest {
@Before
public void setupNetworkBufferPool() {
- networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
+ networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP);
}
@After
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
index 734dcfb..fd11d02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -32,7 +33,7 @@ public class BufferTest {
@Test
public void testSetGetSize() {
- final MemorySegment segment = new MemorySegment(new byte[1024]);
+ final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
final BufferRecycler recycler = Mockito.mock(BufferRecycler.class);
Buffer buffer = new Buffer(segment, recycler);
@@ -58,7 +59,7 @@ public class BufferTest {
@Test
public void testgetNioBufferThreadSafe() {
- final MemorySegment segment = new MemorySegment(new byte[1024]);
+ final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
final BufferRecycler recycler = Mockito.mock(BufferRecycler.class);
Buffer buffer = new Buffer(segment, recycler);
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index e8e9ec8..93731e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.buffer;
import com.google.common.collect.Lists;
+import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.util.event.EventListener;
import org.junit.After;
import org.junit.AfterClass;
@@ -61,7 +62,7 @@ public class LocalBufferPoolTest {
@Before
public void setupLocalBufferPool() {
- networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
+ networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP);
localBufferPool = new LocalBufferPool(networkBufferPool, 1);
assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 6b22cd9..fd5c7a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.buffer;
+import org.apache.flink.core.memory.MemoryType;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -35,7 +36,7 @@ public class NetworkBufferPoolTest {
final int bufferSize = 128;
final int numBuffers = 10;
- NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize);
+ NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize, MemoryType.HEAP);
assertEquals(bufferSize, globalPool.getMemorySegmentSize());
assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments());
assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
@@ -70,7 +71,7 @@ public class NetworkBufferPoolTest {
@Test
public void testDestroyAll() {
try {
- NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
+ NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
BufferPool fixedPool = globalPool.createBufferPool(2, true);
BufferPool nonFixedPool = globalPool.createBufferPool(5, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index 60241e3..f514cbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.task.IntegerTaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -53,7 +53,7 @@ public class NettyMessageSerializationTest {
@Test
public void testEncodeDecode() {
{
- Buffer buffer = spy(new Buffer(new MemorySegment(new byte[1024]), mock(BufferRecycler.class)));
+ Buffer buffer = spy(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)));
ByteBuffer nioBuffer = buffer.getNioBuffer();
for (int i = 0; i < 1024; i += 4) {
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 65780b4..cfbe99e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -68,7 +68,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
@Override
public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
if (inputIterator.next(reuse) != null) {
- final Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class));
+ final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), mock(BufferRecycler.class));
serializer.setNextBuffer(buffer);
serializer.addRecord(reuse);
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index cd56318..ea40a55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import com.google.common.collect.Lists;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -94,7 +95,7 @@ public class LocalInputChannelTest {
final NetworkBufferPool networkBuffers = new NetworkBufferPool(
(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
- TestBufferFactory.BUFFER_SIZE);
+ TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP);
final ResultPartitionConsumableNotifier partitionConsumableNotifier =
mock(ResultPartitionConsumableNotifier.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 82cc730..f4c37f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.TaskEvent;
@@ -100,7 +100,8 @@ public class SingleInputGateTest {
when(taskEventDispatcher.publish(any(ResultPartitionID.class), any(TaskEvent.class))).thenReturn(true);
final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class);
- when(iterator.getNextBuffer()).thenReturn(new Buffer(new MemorySegment(new byte[1024]), mock(BufferRecycler.class)));
+ when(iterator.getNextBuffer()).thenReturn(
+ new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)));
final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
when(partitionManager.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class))).thenReturn(iterator);
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index d9e3562..d628596 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.serialization;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -52,7 +52,7 @@ public class LargeRecordsTest {
final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
final RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
- final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
+ final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>();
List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>();
@@ -149,7 +149,7 @@ public class LargeRecordsTest {
final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>();
- final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
+ final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>();
List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>();
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index cdba545..4b3b465 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.io.network.util;
+import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.testutils.DiscardingRecycler;
@@ -59,7 +61,7 @@ public class TestBufferFactory {
public Buffer create() {
numberOfCreatedBuffers.incrementAndGet();
- return new Buffer(new MemorySegment(new byte[bufferSize]), bufferRecycler);
+ return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), bufferRecycler);
}
public Buffer createFrom(MemorySegment segment) {
@@ -85,7 +87,7 @@ public class TestBufferFactory {
public static Buffer createBuffer(int bufferSize) {
checkArgument(bufferSize > 0);
- return new Buffer(new MemorySegment(new byte[bufferSize]), RECYCLER);
+ return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), RECYCLER);
}
public static Buffer getMockBuffer() {
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
index fc88207..15251e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
@@ -19,9 +19,8 @@
package org.apache.flink.runtime.memory;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.junit.After;
@@ -48,14 +47,14 @@ public class MemoryManagerLazyAllocationTest {
private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
- private DefaultMemoryManager memoryManager;
+ private MemoryManager memoryManager;
private Random random;
@Before
public void setUp() {
- this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, false);
+ this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, false);
this.random = new Random(RANDOM_SEED);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index c0f32ca..a20a180 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -23,9 +23,8 @@ import java.util.List;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.junit.Assert;
@@ -48,14 +47,14 @@ public class MemoryManagerTest {
private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
- private DefaultMemoryManager memoryManager;
+ private MemoryManager memoryManager;
private Random random;
@Before
public void setUp() {
- this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
+ this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
this.random = new Random(RANDOM_SEED);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
new file mode 100644
index 0000000..fad1b0e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.core.memory.MemorySegment;
+
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MemorySegmentSimpleTest {
+
+ public static final long RANDOM_SEED = 643196033469871L;
+
+ public static final int MANAGED_MEMORY_SIZE = 1024 * 1024 * 16;
+
+ public static final int PAGE_SIZE = 1024 * 512;
+
+ private MemoryManager manager;
+
+ private MemorySegment segment;
+
+ private Random random;
+
+ @Before
+ public void setUp() throws Exception{
+ try {
+ this.manager = new MemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
+ this.segment = manager.allocatePages(new DummyInvokable(), 1).get(0);
+ this.random = new Random(RANDOM_SEED);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Test setup failed.");
+ }
+ }
+
+ @After
+ public void tearDown() {
+ this.manager.release(this.segment);
+ this.random = null;
+ this.segment = null;
+
+ if (!this.manager.verifyEmpty()) {
+ Assert.fail("Not all memory has been properly released.");
+ }
+ this.manager = null;
+ }
+
+ @Test
+ public void bulkByteAccess() {
+
+ // test exceptions
+ {
+ byte[] bytes = new byte[PAGE_SIZE / 4];
+
+ try {
+ segment.put(3 * (PAGE_SIZE / 4) + 1, bytes);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.put(7 * (PAGE_SIZE / 8) + 1, bytes, 0, bytes.length / 2);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+ }
+
+ // test expected correct behavior with default offset / length
+ {
+ long seed = random.nextLong();
+
+ random.setSeed(seed);
+ byte[] src = new byte[PAGE_SIZE / 8];
+ for (int i = 0; i < 8; i++) {
+ random.nextBytes(src);
+ segment.put(i * (PAGE_SIZE / 8), src);
+ }
+
+ random.setSeed(seed);
+ byte[] expected = new byte[PAGE_SIZE / 8];
+ byte[] actual = new byte[PAGE_SIZE / 8];
+ for (int i = 0; i < 8; i++) {
+ random.nextBytes(expected);
+ segment.get(i * (PAGE_SIZE / 8), actual);
+
+ assertArrayEquals(expected, actual);
+ }
+ }
+
+ // test expected correct behavior with specific offset / length
+ {
+ byte[] expected = new byte[PAGE_SIZE];
+ random.nextBytes(expected);
+
+ for (int i = 0; i < 16; i++) {
+ segment.put(i * (PAGE_SIZE / 16), expected, i * (PAGE_SIZE / 16),
+ PAGE_SIZE / 16);
+ }
+
+ byte[] actual = new byte[PAGE_SIZE];
+ for (int i = 0; i < 16; i++) {
+ segment.get(i * (PAGE_SIZE / 16), actual, i * (PAGE_SIZE / 16),
+ PAGE_SIZE / 16);
+ }
+
+ assertArrayEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void byteAccess() {
+ // test exceptions
+ {
+ try {
+ segment.put(-1, (byte) 0);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.put(PAGE_SIZE, (byte) 0);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.get(-1);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.get(PAGE_SIZE);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+ }
+
+ // test expected correct behavior
+ {
+ long seed = random.nextLong();
+
+ random.setSeed(seed);
+ for (int i = 0; i < PAGE_SIZE; i++) {
+ segment.put(i, (byte) random.nextInt());
+ }
+
+ random.setSeed(seed);
+ for (int i = 0; i < PAGE_SIZE; i++) {
+ assertEquals((byte) random.nextInt(), segment.get(i));
+ }
+ }
+ }
+
+ @Test
+ public void booleanAccess() {
+ // test exceptions
+ {
+ try {
+ segment.putBoolean(-1, false);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.putBoolean(PAGE_SIZE, false);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getBoolean(-1);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getBoolean(PAGE_SIZE);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+ }
+
+ // test expected correct behavior
+ {
+ long seed = random.nextLong();
+
+ random.setSeed(seed);
+ for (int i = 0; i < PAGE_SIZE; i++) {
+ segment.putBoolean(i, random.nextBoolean());
+ }
+
+ random.setSeed(seed);
+ for (int i = 0; i < PAGE_SIZE; i++) {
+ assertEquals(random.nextBoolean(), segment.getBoolean(i));
+ }
+ }
+ }
+
+ @Test
+ public void charAccess() {
+ // test exceptions
+ {
+ try {
+ segment.putChar(-1, 'a');
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.putChar(PAGE_SIZE, 'a');
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getChar(-1);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getChar(PAGE_SIZE);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+ }
+
+ // test expected correct behavior
+ {
+ long seed = random.nextLong();
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 2; i += 2) {
+ segment.putChar(i, (char) ('a' + random.nextInt(26)));
+ }
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 2; i += 2) {
+ assertEquals((char) ('a' + random.nextInt(26)), segment.getChar(i));
+ }
+ }
+ }
+
+ @Test
+ public void doubleAccess() {
+ // test exceptions
+ {
+ try {
+ segment.putDouble(-1, 0.0);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.putDouble(PAGE_SIZE, 0.0);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getDouble(-1);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getDouble(PAGE_SIZE);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+ }
+
+ // test expected correct behavior
+ {
+ long seed = random.nextLong();
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 8; i += 8) {
+ segment.putDouble(i, random.nextDouble());
+ }
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 8; i += 8) {
+ assertEquals(random.nextDouble(), segment.getDouble(i), 0.0);
+ }
+ }
+ }
+
+ // @Test
+ public void floatAccess() {
+ // test exceptions
+ {
+ try {
+ segment.putFloat(-1, 0.0f);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.putFloat(PAGE_SIZE, 0.0f);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getFloat(-1);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getFloat(PAGE_SIZE);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+ }
+
+ // test expected correct behavior
+ {
+ long seed = random.nextLong();
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 4; i += 4) {
+ segment.putFloat(i, random.nextFloat());
+ }
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 4; i += 4) {
+ assertEquals(random.nextFloat(), segment.getFloat(i), 0.0);
+ }
+ }
+ }
+
+ @Test
+ public void longAccess() {
+ // test exceptions
+ {
+ try {
+ segment.putLong(-1, 0L);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.putLong(PAGE_SIZE, 0L);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getLong(-1);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getLong(PAGE_SIZE);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+ }
+
+ // test expected correct behavior
+ {
+ long seed = random.nextLong();
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 8; i += 8) {
+ segment.putLong(i, random.nextLong());
+ }
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 8; i += 8) {
+ assertEquals(random.nextLong(), segment.getLong(i));
+ }
+ }
+
+ // test unaligned offsets
+ {
+ final long seed = random.nextLong();
+
+ random.setSeed(seed);
+ for (int offset = 0; offset < PAGE_SIZE - 8; offset += random.nextInt(24) + 8) {
+ long value = random.nextLong();
+ segment.putLong(offset, value);
+ }
+
+ random.setSeed(seed);
+ for (int offset = 0; offset < PAGE_SIZE - 8; offset += random.nextInt(24) + 8) {
+ long shouldValue = random.nextLong();
+ long isValue = segment.getLong(offset);
+ assertEquals(shouldValue, isValue);
+ }
+ }
+ }
+
+ @Test
+ public void intAccess() {
+ // test exceptions
+ {
+ try {
+ segment.putInt(-1, 0);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.putInt(PAGE_SIZE, 0);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getInt(-1);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getInt(PAGE_SIZE);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+ }
+
+ // test expected correct behavior
+ {
+ long seed = random.nextLong();
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 4; i += 4) {
+ segment.putInt(i, random.nextInt());
+ }
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 4; i += 4) {
+ assertEquals(random.nextInt(), segment.getInt(i));
+ }
+ }
+ }
+
+ @Test
+ public void shortAccess() {
+ // test exceptions
+ {
+ try {
+ segment.putShort(-1, (short) 0);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.putShort(PAGE_SIZE, (short) 0);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getShort(-1);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+
+ try {
+ segment.getShort(PAGE_SIZE);
+ fail("IndexOutOfBoundsException expected");
+ } catch (Exception e) {
+ assertTrue(e instanceof IndexOutOfBoundsException);
+ }
+ }
+
+ // test expected correct behavior
+ {
+ long seed = random.nextLong();
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 2; i += 2) {
+ segment.putShort(i, (short) random.nextInt());
+ }
+
+ random.setSeed(seed);
+ for (int i = 0; i <= PAGE_SIZE - 2; i += 2) {
+ assertEquals((short) random.nextInt(), segment.getShort(i));
+ }
+ }
+ }
+
+ @Test
+ public void testByteBufferWrapping() {
+ try {
+ MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(1024);
+
+ ByteBuffer buf1 = seg.wrap(13, 47);
+ assertEquals(13, buf1.position());
+ assertEquals(60, buf1.limit());
+ assertEquals(47, buf1.remaining());
+
+ ByteBuffer buf2 = seg.wrap(500, 267);
+ assertEquals(500, buf2.position());
+ assertEquals(767, buf2.limit());
+ assertEquals(267, buf2.remaining());
+
+ ByteBuffer buf3 = seg.wrap(0, 1024);
+ assertEquals(0, buf3.position());
+ assertEquals(1024, buf3.limit());
+ assertEquals(1024, buf3.remaining());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+}