You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/03 10:37:19 UTC

[1/3] flink git commit: [distributed runtime] Throw interrupted exception during partition request client creation

Repository: flink
Updated Branches:
  refs/heads/master 2a528712d -> 940704156


[distributed runtime] Throw interrupted exception during partition request client creation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94070415
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94070415
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94070415

Branch: refs/heads/master
Commit: 9407041565b5402cd6c1923fbfa764e01a5db178
Parents: 6da093a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Mar 2 20:08:33 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Mar 3 10:36:56 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/io/network/ConnectionManager.java      |  2 +-
 .../runtime/io/network/netty/NettyConnectionManager.java |  2 +-
 .../io/network/netty/PartitionRequestClientFactory.java  | 11 +++--------
 .../io/network/partition/consumer/InputChannel.java      |  2 +-
 .../runtime/io/network/partition/consumer/InputGate.java |  2 +-
 .../network/partition/consumer/RemoteInputChannel.java   |  2 +-
 .../io/network/partition/consumer/SingleInputGate.java   |  4 ++--
 .../io/network/partition/consumer/UnionInputGate.java    |  2 +-
 .../org/apache/flink/streaming/io/CoRecordReader.java    |  2 +-
 9 files changed, 12 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index d478e0f..76f8bbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -34,7 +34,7 @@ public interface ConnectionManager {
 	/**
 	 * Creates a {@link PartitionRequestClient} instance for the given {@link RemoteAddress}.
 	 */
-	PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException;
+	PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException;
 
 	/**
 	 * Closes opened ChannelConnections in case of a resource release

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 5d03c15..260ea7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -49,7 +49,7 @@ public class NettyConnectionManager implements ConnectionManager {
 	}
 
 	@Override
-	public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
+	public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException {
 		return partitionRequestClientFactory.createPartitionRequestClient(remoteAddress);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index d7e6efd..d4c022b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -48,7 +48,7 @@ class PartitionRequestClientFactory {
 	 * Atomically establishes a TCP connection to the given remote address and
 	 * creates a {@link PartitionRequestClient} instance for this connection.
 	 */
-	PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
+	PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException {
 		Object entry;
 		PartitionRequestClient client = null;
 
@@ -182,15 +182,10 @@ class PartitionRequestClientFactory {
 
 		private volatile Throwable error;
 
-		private PartitionRequestClient waitForChannel() throws IOException {
+		private PartitionRequestClient waitForChannel() throws IOException, InterruptedException {
 			synchronized (connectLock) {
 				while (error == null && partitionRequestClient == null) {
-					try {
-						connectLock.wait(2000);
-					}
-					catch (InterruptedException e) {
-						throw new RuntimeException("Wait for channel connection interrupted.");
-					}
+					connectLock.wait(2000);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 31b67ca..7173566 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -92,7 +92,7 @@ public abstract class InputChannel {
 	 * The queue index to request depends on which sub task the channel belongs
 	 * to and is specified by the consumer of this channel.
 	 */
-	public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException;
+	public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException;
 
 	/**
 	 * Returns the next buffer from the consumed subpartition.

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 8d28084..43cdd29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -29,7 +29,7 @@ public interface InputGate {
 
 	public boolean isFinished();
 
-	public void requestPartitions() throws IOException;
+	public void requestPartitions() throws IOException, InterruptedException;
 
 	public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index daf94e6..d50ddc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -85,7 +85,7 @@ public class RemoteInputChannel extends InputChannel {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void requestIntermediateResultPartition(int queueIndex) throws IOException {
+	public void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException {
 		if (partitionRequestClient == null) {
 			LOG.debug("Requesting REMOTE queue {} from partition {} produced by {}.", queueIndex, partitionId, producerExecutionId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 0383cca..19898c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -192,7 +192,7 @@ public class SingleInputGate implements InputGate {
 		}
 	}
 
-	public void updateInputChannel(PartitionInfo partitionInfo) throws IOException {
+	public void updateInputChannel(PartitionInfo partitionInfo) throws IOException, InterruptedException {
 		synchronized (requestLock) {
 			if (releasedResourcesFlag) {
 				// There was a race with a task failure/cancel
@@ -273,7 +273,7 @@ public class SingleInputGate implements InputGate {
 	}
 
 	@Override
-	public void requestPartitions() throws IOException {
+	public void requestPartitions() throws IOException, InterruptedException {
 		if (!requestedPartitionsFlag) {
 			// Sanity check
 			if (numberOfInputChannels != inputChannels.size()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 4994f13..5a7a5b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -120,7 +120,7 @@ public class UnionInputGate implements InputGate {
 	}
 
 	@Override
-	public void requestPartitions() throws IOException {
+	public void requestPartitions() throws IOException, InterruptedException {
 		if (!requestedPartitionsFlag) {
 			for (InputGate inputGate : inputGates) {
 				inputGate.requestPartitions();

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index bb3a659..84b08f7 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -78,7 +78,7 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 		bufferReader2.registerListener(this);
 	}
 
-	public void requestPartitionsOnce() throws IOException {
+	public void requestPartitionsOnce() throws IOException, InterruptedException {
 		if (!hasRequestedPartitions) {
 			bufferReader1.requestPartitions();
 			bufferReader2.requestPartitions();


[2/3] flink git commit: [distributed runtime] Notify about error when handing in channel

Posted by uc...@apache.org.
[distributed runtime] Notify about error when handing in channel


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6da093a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6da093a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6da093a6

Branch: refs/heads/master
Commit: 6da093a659180dadf6a2b6a56d10d88fbd19ed8c
Parents: 256b2ee
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Mar 2 20:01:04 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Mar 3 10:36:56 2015 +0100

----------------------------------------------------------------------
 .../netty/PartitionRequestClientFactory.java    | 42 ++++++++++++--------
 1 file changed, 26 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6da093a6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index d64548d..d7e6efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -52,20 +52,22 @@ class PartitionRequestClientFactory {
 		Object entry;
 		PartitionRequestClient client = null;
 
-		while(client == null) {
+		while (client == null) {
 			entry = clients.get(remoteAddress);
 
 			if (entry != null) {
 				// Existing channel or connecting channel
 				if (entry instanceof PartitionRequestClient) {
 					client = (PartitionRequestClient) entry;
-				} else {
+				}
+				else {
 					ConnectingChannel future = (ConnectingChannel) entry;
 					client = future.waitForChannel();
 
 					clients.replace(remoteAddress, future, client);
 				}
-			} else {
+			}
+			else {
 				// No channel yet. Create one, but watch out for a race.
 				// We create a "connecting future" and atomically add it to the map.
 				// Only the thread that really added it establishes the channel.
@@ -79,18 +81,20 @@ class PartitionRequestClientFactory {
 					client = connectingChannel.waitForChannel();
 
 					clients.replace(remoteAddress, connectingChannel, client);
-				} else if (old instanceof ConnectingChannel) {
+				}
+				else if (old instanceof ConnectingChannel) {
 					client = ((ConnectingChannel) old).waitForChannel();
 
 					clients.replace(remoteAddress, old, client);
-				} else {
+				}
+				else {
 					client = (PartitionRequestClient) old;
 				}
 			}
 
 			// Make sure to increment the reference count before handing a client
 			// out to ensure correct bookkeeping for channel closing.
-			if(!client.incrementReferenceCounter()){
+			if (!client.incrementReferenceCounter()) {
 				destroyPartitionRequestClient(remoteAddress, client);
 				client = null;
 			}
@@ -102,7 +106,7 @@ class PartitionRequestClientFactory {
 	public void closeOpenChannelConnections(RemoteAddress remoteAddress) {
 		Object entry = clients.get(remoteAddress);
 
-		if(entry instanceof ConnectingChannel) {
+		if (entry instanceof ConnectingChannel) {
 			ConnectingChannel channel = (ConnectingChannel) entry;
 
 			if (channel.dispose()) {
@@ -141,8 +145,9 @@ class PartitionRequestClientFactory {
 			boolean result;
 			synchronized (connectLock) {
 				if (partitionRequestClient != null) {
-					result =  partitionRequestClient.disposeIfNotUsed();
-				} else {
+					result = partitionRequestClient.disposeIfNotUsed();
+				}
+				else {
 					disposeRequestClient = true;
 					result = true;
 				}
@@ -155,16 +160,21 @@ class PartitionRequestClientFactory {
 
 		private void handInChannel(Channel channel) {
 			synchronized (connectLock) {
-				PartitionRequestClientHandler requestHandler =
-						(PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
+				try {
+					PartitionRequestClientHandler requestHandler =
+							(PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
 
-				partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory);
+					partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory);
 
-				if (disposeRequestClient) {
-					partitionRequestClient.disposeIfNotUsed();
-				}
+					if (disposeRequestClient) {
+						partitionRequestClient.disposeIfNotUsed();
+					}
 
-				connectLock.notifyAll();
+					connectLock.notifyAll();
+				}
+				catch (Throwable t) {
+					notifyOfError(t);
+				}
 			}
 		}
 


[3/3] flink git commit: [distributed runtime] [tests] Add helper setters for NettyConfig

Posted by uc...@apache.org.
[distributed runtime] [tests] Add helper setters for NettyConfig


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/256b2ee3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/256b2ee3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/256b2ee3

Branch: refs/heads/master
Commit: 256b2ee3e530eea80d73ae1dd013648de26af5b5
Parents: 2a52871
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Mar 2 18:41:15 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Mar 3 10:36:56 2015 +0100

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyClient.java   |  2 +-
 .../runtime/io/network/netty/NettyConfig.java   | 76 ++++++++++++++++++--
 2 files changed, 71 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/256b2ee3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index 35bccf4..86e84ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -88,7 +88,7 @@ class NettyClient {
 		bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
 
 		// Timeout for new connections
-		bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutMs() * 1000);
+		bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutSeconds() * 1000);
 
 		// Pooled allocator for Netty's ByteBuf instances
 		bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

http://git-wip-us.apache.org/repos/asf/flink/blob/256b2ee3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index a3e01fe..45ffb15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -31,10 +31,22 @@ public class NettyConfig {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);
 
+	// - Config keys ----------------------------------------------------------
+
 	public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads";
 
 	public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads";
 
+	public static final String CONNECT_BACKLOG = "taskmanager.net.server.backlog";
+
+	public static final String CLIENT_CONNECT_TIMEOUT_SECONDS = "taskmanager.net.client.connectTimeoutSec";
+
+	public static final String SEND_RECEIVE_BUFFER_SIZE = "taskmanager.net.sendReceiveBufferSize";
+
+	public static final String TRANSPORT_TYPE = "taskmanager.net.transport";
+
+	// ------------------------------------------------------------------------
+
 	static enum TransportType {
 		NIO, EPOLL, AUTO
 	}
@@ -78,10 +90,62 @@ public class NettyConfig {
 	}
 
 	// ------------------------------------------------------------------------
+	// Setters
+	// ------------------------------------------------------------------------
+
+	NettyConfig setServerConnectBacklog(int connectBacklog) {
+		checkArgument(connectBacklog >= 0);
+		config.setInteger(CONNECT_BACKLOG, connectBacklog);
+
+		return this;
+	}
+
+	NettyConfig setServerNumThreads(int numThreads) {
+		checkArgument(numThreads >= 0);
+		config.setInteger(NUM_THREADS_SERVER, numThreads);
+
+		return this;
+	}
+
+	NettyConfig setClientNumThreads(int numThreads) {
+		checkArgument(numThreads >= 0);
+		config.setInteger(NUM_THREADS_CLIENT, numThreads);
+
+		return this;
+	}
+
+	NettyConfig setClientConnectTimeoutSeconds(int connectTimeoutSeconds) {
+		checkArgument(connectTimeoutSeconds >= 0);
+		config.setInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, connectTimeoutSeconds);
+
+		return this;
+	}
+
+	NettyConfig setSendAndReceiveBufferSize(int bufferSize) {
+		checkArgument(bufferSize >= 0);
+		config.setInteger(SEND_RECEIVE_BUFFER_SIZE, bufferSize);
+
+		return this;
+	}
+
+	NettyConfig setTransportType(String transport) {
+		if (transport.equals("nio") || transport.equals("epoll") || transport.equals("auto")) {
+			config.setString(TRANSPORT_TYPE, transport);
+		}
+		else {
+			throw new IllegalArgumentException("Unknown transport type.");
+		}
+
+		return this;
+	}
+
+	// ------------------------------------------------------------------------
+	// Getters
+	// ------------------------------------------------------------------------
 
 	int getServerConnectBacklog() {
 		// default: 0 => Netty's default
-		return config.getInteger("taskmanager.net.server.backlog", 0);
+		return config.getInteger(CONNECT_BACKLOG, 0);
 	}
 
 	int getServerNumThreads() {
@@ -94,18 +158,18 @@ public class NettyConfig {
 		return config.getInteger(NUM_THREADS_CLIENT, 0);
 	}
 
-	int getClientConnectTimeoutMs() {
+	int getClientConnectTimeoutSeconds() {
 		// default: 120s = 2min
-		return config.getInteger("taskmanager.net.client.connectTimeoutSec", 120);
+		return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, 120);
 	}
 
 	int getSendAndReceiveBufferSize() {
 		// default: 0 => Netty's default
-		return config.getInteger("taskmanager.net.sendReceiveBufferSize", 0);
+		return config.getInteger(SEND_RECEIVE_BUFFER_SIZE, 0);
 	}
 
 	TransportType getTransportType() {
-		String transport = config.getString("taskmanager.net.transport", "nio");
+		String transport = config.getString(TRANSPORT_TYPE, "nio");
 
 		if (transport.equals("nio")) {
 			return TransportType.NIO;
@@ -138,7 +202,7 @@ public class NettyConfig {
 				getTransportType(), getServerNumThreads(), getServerNumThreads() == 0 ? def : man,
 				getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
 				getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
-				getClientConnectTimeoutMs(), getSendAndReceiveBufferSize(),
+				getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
 				getSendAndReceiveBufferSize() == 0 ? def : man);
 	}
 }