You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/26 11:47:06 UTC
[41/53] [abbrv] git commit: [FLINK-949] Properly report
GlobalBufferPool OutOfMemoryError to TaskManager
[FLINK-949] Properly report GlobalBufferPool OutOfMemoryError to TaskManager
This closes #28.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3d6cc5f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3d6cc5f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3d6cc5f4
Branch: refs/heads/travis_test
Commit: 3d6cc5f48dc5a0336f8afdd35f59f1ee25357766
Parents: 96e76a5
Author: uce <u....@fu-berlin.de>
Authored: Thu Jun 19 02:42:03 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Jun 25 12:07:16 2014 +0200
----------------------------------------------------------------------
.../nephele/taskmanager/TaskManager.java | 9 ++++----
.../runtime/io/network/ChannelManager.java | 6 ++++-
.../bufferprovider/GlobalBufferPool.java | 24 ++++++++++++++++----
3 files changed, 28 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d6cc5f4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index bedafaf..cb06d08 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -288,11 +288,7 @@ public class TaskManager implements TaskOperationProtocol {
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
checkTempDirs(tmpDirPaths);
-
- final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
- // Initialize network buffer pool
int numBuffers = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
@@ -335,7 +331,7 @@ public class TaskManager implements TaskOperationProtocol {
channelManager = new ChannelManager(lookupService, localInstanceConnectionInfo, numBuffers, bufferSize, networkConnectionManager);
} catch (IOException ioe) {
LOG.error(StringUtils.stringifyException(ioe));
- throw new Exception("Failed to instantiate channel manager. " + ioe.getMessage(), ioe);
+ throw new Exception("Failed to instantiate ChannelManager.", ioe);
}
{
@@ -363,6 +359,9 @@ public class TaskManager implements TaskOperationProtocol {
}
this.hardwareDescription = resources;
+ final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
+
// Initialize the memory manager
LOG.info("Initializing memory manager with " + (resources.getSizeOfFreeMemory() >>> 20) + " megabytes of memory. " +
"Page size is " + pageSize + " bytes.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d6cc5f4/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
index 34d3501..10a0cfb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
@@ -79,7 +79,11 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
this.channelLookupService = channelLookupService;
this.connectionInfo = connectionInfo;
- this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
+ try {
+ this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
+ } catch (Throwable e) {
+ throw new IOException("Failed to instantiate GlobalBufferPool.", e);
+ }
this.networkConnectionManager = networkConnectionManager;
networkConnectionManager.start(this);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d6cc5f4/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
index 2141017..aca1a9f 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
@@ -52,13 +52,27 @@ public final class GlobalBufferPool {
this.numBuffers = numBuffers;
this.bufferSize = bufferSize;
- this.buffers = new ArrayBlockingQueue<MemorySegment>(this.numBuffers);
- for (int i = 0; i < this.numBuffers; i++) {
- this.buffers.add(new MemorySegment(new byte[this.bufferSize]));
+ buffers = new ArrayBlockingQueue<MemorySegment>(numBuffers);
+
+ final int mb = 1 << 20;
+ final int memRequiredMb = (numBuffers * bufferSize) / mb;
+
+ for (int i = 0; i < numBuffers; i++) {
+ try {
+ byte[] buf = new byte[bufferSize];
+ buffers.add(new MemorySegment(buf));
+ } catch (OutOfMemoryError err) {
+ int memAllocatedMb = ((i + 1) * bufferSize) / mb;
+
+ String msg = String.format("Tried to allocate %d buffers of size %d bytes each (total: %d MB) " +
+ "and ran out of memory after %d buffers (%d MB).",
+ numBuffers, bufferSize, memRequiredMb, i + 1, memAllocatedMb);
+ throw new OutOfMemoryError(msg);
+ }
}
- LOG.info(String.format("Initialized global buffer pool with %d buffers (%d bytes each).",
- this.numBuffers, this.bufferSize));
+ LOG.info(String.format("Allocated %d buffers of size %d bytes each (total: %d MB).",
+ numBuffers, bufferSize, memRequiredMb));
}
// -----------------------------------------------------------------------------------------------------------------