You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/03 10:37:19 UTC
[1/3] flink git commit: [distributed runtime] Throw interrupted
exception during partition request client creation
Repository: flink
Updated Branches:
refs/heads/master 2a528712d -> 940704156
[distributed runtime] Throw interrupted exception during partition request client creation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94070415
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94070415
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94070415
Branch: refs/heads/master
Commit: 9407041565b5402cd6c1923fbfa764e01a5db178
Parents: 6da093a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Mar 2 20:08:33 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Mar 3 10:36:56 2015 +0100
----------------------------------------------------------------------
.../flink/runtime/io/network/ConnectionManager.java | 2 +-
.../runtime/io/network/netty/NettyConnectionManager.java | 2 +-
.../io/network/netty/PartitionRequestClientFactory.java | 11 +++--------
.../io/network/partition/consumer/InputChannel.java | 2 +-
.../runtime/io/network/partition/consumer/InputGate.java | 2 +-
.../network/partition/consumer/RemoteInputChannel.java | 2 +-
.../io/network/partition/consumer/SingleInputGate.java | 4 ++--
.../io/network/partition/consumer/UnionInputGate.java | 2 +-
.../org/apache/flink/streaming/io/CoRecordReader.java | 2 +-
9 files changed, 12 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index d478e0f..76f8bbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -34,7 +34,7 @@ public interface ConnectionManager {
/**
* Creates a {@link PartitionRequestClient} instance for the given {@link RemoteAddress}.
*/
- PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException;
+ PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException;
/**
* Closes opened ChannelConnections in case of a resource release
http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 5d03c15..260ea7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -49,7 +49,7 @@ public class NettyConnectionManager implements ConnectionManager {
}
@Override
- public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
+ public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException {
return partitionRequestClientFactory.createPartitionRequestClient(remoteAddress);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index d7e6efd..d4c022b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -48,7 +48,7 @@ class PartitionRequestClientFactory {
* Atomically establishes a TCP connection to the given remote address and
* creates a {@link PartitionRequestClient} instance for this connection.
*/
- PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
+ PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException {
Object entry;
PartitionRequestClient client = null;
@@ -182,15 +182,10 @@ class PartitionRequestClientFactory {
private volatile Throwable error;
- private PartitionRequestClient waitForChannel() throws IOException {
+ private PartitionRequestClient waitForChannel() throws IOException, InterruptedException {
synchronized (connectLock) {
while (error == null && partitionRequestClient == null) {
- try {
- connectLock.wait(2000);
- }
- catch (InterruptedException e) {
- throw new RuntimeException("Wait for channel connection interrupted.");
- }
+ connectLock.wait(2000);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 31b67ca..7173566 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -92,7 +92,7 @@ public abstract class InputChannel {
* The queue index to request depends on which sub task the channel belongs
* to and is specified by the consumer of this channel.
*/
- public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException;
+ public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException;
/**
* Returns the next buffer from the consumed subpartition.
http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 8d28084..43cdd29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -29,7 +29,7 @@ public interface InputGate {
public boolean isFinished();
- public void requestPartitions() throws IOException;
+ public void requestPartitions() throws IOException, InterruptedException;
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException;
http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index daf94e6..d50ddc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -85,7 +85,7 @@ public class RemoteInputChannel extends InputChannel {
// ------------------------------------------------------------------------
@Override
- public void requestIntermediateResultPartition(int queueIndex) throws IOException {
+ public void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException {
if (partitionRequestClient == null) {
LOG.debug("Requesting REMOTE queue {} from partition {} produced by {}.", queueIndex, partitionId, producerExecutionId);
http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 0383cca..19898c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -192,7 +192,7 @@ public class SingleInputGate implements InputGate {
}
}
- public void updateInputChannel(PartitionInfo partitionInfo) throws IOException {
+ public void updateInputChannel(PartitionInfo partitionInfo) throws IOException, InterruptedException {
synchronized (requestLock) {
if (releasedResourcesFlag) {
// There was a race with a task failure/cancel
@@ -273,7 +273,7 @@ public class SingleInputGate implements InputGate {
}
@Override
- public void requestPartitions() throws IOException {
+ public void requestPartitions() throws IOException, InterruptedException {
if (!requestedPartitionsFlag) {
// Sanity check
if (numberOfInputChannels != inputChannels.size()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 4994f13..5a7a5b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -120,7 +120,7 @@ public class UnionInputGate implements InputGate {
}
@Override
- public void requestPartitions() throws IOException {
+ public void requestPartitions() throws IOException, InterruptedException {
if (!requestedPartitionsFlag) {
for (InputGate inputGate : inputGates) {
inputGate.requestPartitions();
http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index bb3a659..84b08f7 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -78,7 +78,7 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
bufferReader2.registerListener(this);
}
- public void requestPartitionsOnce() throws IOException {
+ public void requestPartitionsOnce() throws IOException, InterruptedException {
if (!hasRequestedPartitions) {
bufferReader1.requestPartitions();
bufferReader2.requestPartitions();
[2/3] flink git commit: [distributed runtime] Notify about error when
handing in channel
Posted by uc...@apache.org.
[distributed runtime] Notify about error when handing in channel
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6da093a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6da093a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6da093a6
Branch: refs/heads/master
Commit: 6da093a659180dadf6a2b6a56d10d88fbd19ed8c
Parents: 256b2ee
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Mar 2 20:01:04 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Mar 3 10:36:56 2015 +0100
----------------------------------------------------------------------
.../netty/PartitionRequestClientFactory.java | 42 ++++++++++++--------
1 file changed, 26 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6da093a6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index d64548d..d7e6efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -52,20 +52,22 @@ class PartitionRequestClientFactory {
Object entry;
PartitionRequestClient client = null;
- while(client == null) {
+ while (client == null) {
entry = clients.get(remoteAddress);
if (entry != null) {
// Existing channel or connecting channel
if (entry instanceof PartitionRequestClient) {
client = (PartitionRequestClient) entry;
- } else {
+ }
+ else {
ConnectingChannel future = (ConnectingChannel) entry;
client = future.waitForChannel();
clients.replace(remoteAddress, future, client);
}
- } else {
+ }
+ else {
// No channel yet. Create one, but watch out for a race.
// We create a "connecting future" and atomically add it to the map.
// Only the thread that really added it establishes the channel.
@@ -79,18 +81,20 @@ class PartitionRequestClientFactory {
client = connectingChannel.waitForChannel();
clients.replace(remoteAddress, connectingChannel, client);
- } else if (old instanceof ConnectingChannel) {
+ }
+ else if (old instanceof ConnectingChannel) {
client = ((ConnectingChannel) old).waitForChannel();
clients.replace(remoteAddress, old, client);
- } else {
+ }
+ else {
client = (PartitionRequestClient) old;
}
}
// Make sure to increment the reference count before handing a client
// out to ensure correct bookkeeping for channel closing.
- if(!client.incrementReferenceCounter()){
+ if (!client.incrementReferenceCounter()) {
destroyPartitionRequestClient(remoteAddress, client);
client = null;
}
@@ -102,7 +106,7 @@ class PartitionRequestClientFactory {
public void closeOpenChannelConnections(RemoteAddress remoteAddress) {
Object entry = clients.get(remoteAddress);
- if(entry instanceof ConnectingChannel) {
+ if (entry instanceof ConnectingChannel) {
ConnectingChannel channel = (ConnectingChannel) entry;
if (channel.dispose()) {
@@ -141,8 +145,9 @@ class PartitionRequestClientFactory {
boolean result;
synchronized (connectLock) {
if (partitionRequestClient != null) {
- result = partitionRequestClient.disposeIfNotUsed();
- } else {
+ result = partitionRequestClient.disposeIfNotUsed();
+ }
+ else {
disposeRequestClient = true;
result = true;
}
@@ -155,16 +160,21 @@ class PartitionRequestClientFactory {
private void handInChannel(Channel channel) {
synchronized (connectLock) {
- PartitionRequestClientHandler requestHandler =
- (PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
+ try {
+ PartitionRequestClientHandler requestHandler =
+ (PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
- partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory);
+ partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory);
- if (disposeRequestClient) {
- partitionRequestClient.disposeIfNotUsed();
- }
+ if (disposeRequestClient) {
+ partitionRequestClient.disposeIfNotUsed();
+ }
- connectLock.notifyAll();
+ connectLock.notifyAll();
+ }
+ catch (Throwable t) {
+ notifyOfError(t);
+ }
}
}
[3/3] flink git commit: [distributed runtime] [tests] Add helper
setters for NettyConfig
Posted by uc...@apache.org.
[distributed runtime] [tests] Add helper setters for NettyConfig
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/256b2ee3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/256b2ee3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/256b2ee3
Branch: refs/heads/master
Commit: 256b2ee3e530eea80d73ae1dd013648de26af5b5
Parents: 2a52871
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Mar 2 18:41:15 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Mar 3 10:36:56 2015 +0100
----------------------------------------------------------------------
.../runtime/io/network/netty/NettyClient.java | 2 +-
.../runtime/io/network/netty/NettyConfig.java | 76 ++++++++++++++++++--
2 files changed, 71 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/256b2ee3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index 35bccf4..86e84ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -88,7 +88,7 @@ class NettyClient {
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
// Timeout for new connections
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutMs() * 1000);
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutSeconds() * 1000);
// Pooled allocator for Netty's ByteBuf instances
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
http://git-wip-us.apache.org/repos/asf/flink/blob/256b2ee3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index a3e01fe..45ffb15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -31,10 +31,22 @@ public class NettyConfig {
private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);
+ // - Config keys ----------------------------------------------------------
+
public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads";
public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads";
+ public static final String CONNECT_BACKLOG = "taskmanager.net.server.backlog";
+
+ public static final String CLIENT_CONNECT_TIMEOUT_SECONDS = "taskmanager.net.client.connectTimeoutSec";
+
+ public static final String SEND_RECEIVE_BUFFER_SIZE = "taskmanager.net.sendReceiveBufferSize";
+
+ public static final String TRANSPORT_TYPE = "taskmanager.net.transport";
+
+ // ------------------------------------------------------------------------
+
static enum TransportType {
NIO, EPOLL, AUTO
}
@@ -78,10 +90,62 @@ public class NettyConfig {
}
// ------------------------------------------------------------------------
+ // Setters
+ // ------------------------------------------------------------------------
+
+ NettyConfig setServerConnectBacklog(int connectBacklog) {
+ checkArgument(connectBacklog >= 0);
+ config.setInteger(CONNECT_BACKLOG, connectBacklog);
+
+ return this;
+ }
+
+ NettyConfig setServerNumThreads(int numThreads) {
+ checkArgument(numThreads >= 0);
+ config.setInteger(NUM_THREADS_SERVER, numThreads);
+
+ return this;
+ }
+
+ NettyConfig setClientNumThreads(int numThreads) {
+ checkArgument(numThreads >= 0);
+ config.setInteger(NUM_THREADS_CLIENT, numThreads);
+
+ return this;
+ }
+
+ NettyConfig setClientConnectTimeoutSeconds(int connectTimeoutSeconds) {
+ checkArgument(connectTimeoutSeconds >= 0);
+ config.setInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, connectTimeoutSeconds);
+
+ return this;
+ }
+
+ NettyConfig setSendAndReceiveBufferSize(int bufferSize) {
+ checkArgument(bufferSize >= 0);
+ config.setInteger(SEND_RECEIVE_BUFFER_SIZE, bufferSize);
+
+ return this;
+ }
+
+ NettyConfig setTransportType(String transport) {
+ if (transport.equals("nio") || transport.equals("epoll") || transport.equals("auto")) {
+ config.setString(TRANSPORT_TYPE, transport);
+ }
+ else {
+ throw new IllegalArgumentException("Unknown transport type.");
+ }
+
+ return this;
+ }
+
+ // ------------------------------------------------------------------------
+ // Getters
+ // ------------------------------------------------------------------------
int getServerConnectBacklog() {
// default: 0 => Netty's default
- return config.getInteger("taskmanager.net.server.backlog", 0);
+ return config.getInteger(CONNECT_BACKLOG, 0);
}
int getServerNumThreads() {
@@ -94,18 +158,18 @@ public class NettyConfig {
return config.getInteger(NUM_THREADS_CLIENT, 0);
}
- int getClientConnectTimeoutMs() {
+ int getClientConnectTimeoutSeconds() {
// default: 120s = 2min
- return config.getInteger("taskmanager.net.client.connectTimeoutSec", 120);
+ return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, 120);
}
int getSendAndReceiveBufferSize() {
// default: 0 => Netty's default
- return config.getInteger("taskmanager.net.sendReceiveBufferSize", 0);
+ return config.getInteger(SEND_RECEIVE_BUFFER_SIZE, 0);
}
TransportType getTransportType() {
- String transport = config.getString("taskmanager.net.transport", "nio");
+ String transport = config.getString(TRANSPORT_TYPE, "nio");
if (transport.equals("nio")) {
return TransportType.NIO;
@@ -138,7 +202,7 @@ public class NettyConfig {
getTransportType(), getServerNumThreads(), getServerNumThreads() == 0 ? def : man,
getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
- getClientConnectTimeoutMs(), getSendAndReceiveBufferSize(),
+ getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
getSendAndReceiveBufferSize() == 0 ? def : man);
}
}