You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/26 11:47:06 UTC

[41/53] [abbrv] git commit: [FLINK-949] Properly report GlobalBufferPool OutOfMemoryError to TaskManager

[FLINK-949] Properly report GlobalBufferPool OutOfMemoryError to TaskManager

This closes #28.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3d6cc5f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3d6cc5f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3d6cc5f4

Branch: refs/heads/travis_test
Commit: 3d6cc5f48dc5a0336f8afdd35f59f1ee25357766
Parents: 96e76a5
Author: uce <u....@fu-berlin.de>
Authored: Thu Jun 19 02:42:03 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Jun 25 12:07:16 2014 +0200

----------------------------------------------------------------------
 .../nephele/taskmanager/TaskManager.java        |  9 ++++----
 .../runtime/io/network/ChannelManager.java      |  6 ++++-
 .../bufferprovider/GlobalBufferPool.java        | 24 ++++++++++++++++----
 3 files changed, 28 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d6cc5f4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index bedafaf..cb06d08 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -288,11 +288,7 @@ public class TaskManager implements TaskOperationProtocol {
 				ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
 
 		checkTempDirs(tmpDirPaths);
-		
-		final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
 
-		// Initialize network buffer pool
 		int numBuffers = GlobalConfiguration.getInteger(
 				ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
 				ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
@@ -335,7 +331,7 @@ public class TaskManager implements TaskOperationProtocol {
 			channelManager = new ChannelManager(lookupService, localInstanceConnectionInfo, numBuffers, bufferSize, networkConnectionManager);
 		} catch (IOException ioe) {
 			LOG.error(StringUtils.stringifyException(ioe));
-			throw new Exception("Failed to instantiate channel manager. " + ioe.getMessage(), ioe);
+			throw new Exception("Failed to instantiate ChannelManager.", ioe);
 		}
 
 		{
@@ -363,6 +359,9 @@ public class TaskManager implements TaskOperationProtocol {
 			}
 			this.hardwareDescription = resources;
 
+			final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+					ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
+
 			// Initialize the memory manager
 			LOG.info("Initializing memory manager with " + (resources.getSizeOfFreeMemory() >>> 20) + " megabytes of memory. " +
 					"Page size is " + pageSize + " bytes.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d6cc5f4/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
index 34d3501..10a0cfb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
@@ -79,7 +79,11 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 		this.channelLookupService = channelLookupService;
 		this.connectionInfo = connectionInfo;
 
-		this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
+		try {
+			this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
+		} catch (Throwable e) {
+			throw new IOException("Failed to instantiate GlobalBufferPool.", e);
+		}
 
 		this.networkConnectionManager = networkConnectionManager;
 		networkConnectionManager.start(this);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d6cc5f4/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
index 2141017..aca1a9f 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
@@ -52,13 +52,27 @@ public final class GlobalBufferPool {
 		this.numBuffers = numBuffers;
 		this.bufferSize = bufferSize;
 
-		this.buffers = new ArrayBlockingQueue<MemorySegment>(this.numBuffers);
-		for (int i = 0; i < this.numBuffers; i++) {
-			this.buffers.add(new MemorySegment(new byte[this.bufferSize]));
+		buffers = new ArrayBlockingQueue<MemorySegment>(numBuffers);
+
+		final int mb = 1 << 20;
+		final int memRequiredMb = (numBuffers * bufferSize) / mb;
+
+		for (int i = 0; i < numBuffers; i++) {
+			try {
+				byte[] buf = new byte[bufferSize];
+				buffers.add(new MemorySegment(buf));
+			} catch (OutOfMemoryError err) {
+				int memAllocatedMb = ((i + 1) * bufferSize) / mb;
+
+				String msg = String.format("Tried to allocate %d buffers of size %d bytes each (total: %d MB) " +
+						"and ran out of memory after %d buffers (%d MB).",
+						numBuffers, bufferSize, memRequiredMb, i + 1, memAllocatedMb);
+				throw new OutOfMemoryError(msg);
+			}
 		}
 
-		LOG.info(String.format("Initialized global buffer pool with %d buffers (%d bytes each).",
-				this.numBuffers, this.bufferSize));
+		LOG.info(String.format("Allocated %d buffers of size %d bytes each (total: %d MB).",
+				numBuffers, bufferSize, memRequiredMb));
 	}
 
 	// -----------------------------------------------------------------------------------------------------------------