You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/03 10:37:20 UTC
[2/3] flink git commit: [distributed runtime] Notify about error when
handing in channel
[distributed runtime] Notify about error when handing in channel
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6da093a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6da093a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6da093a6
Branch: refs/heads/master
Commit: 6da093a659180dadf6a2b6a56d10d88fbd19ed8c
Parents: 256b2ee
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Mar 2 20:01:04 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Mar 3 10:36:56 2015 +0100
----------------------------------------------------------------------
.../netty/PartitionRequestClientFactory.java | 42 ++++++++++++--------
1 file changed, 26 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6da093a6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index d64548d..d7e6efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -52,20 +52,22 @@ class PartitionRequestClientFactory {
Object entry;
PartitionRequestClient client = null;
- while(client == null) {
+ while (client == null) {
entry = clients.get(remoteAddress);
if (entry != null) {
// Existing channel or connecting channel
if (entry instanceof PartitionRequestClient) {
client = (PartitionRequestClient) entry;
- } else {
+ }
+ else {
ConnectingChannel future = (ConnectingChannel) entry;
client = future.waitForChannel();
clients.replace(remoteAddress, future, client);
}
- } else {
+ }
+ else {
// No channel yet. Create one, but watch out for a race.
// We create a "connecting future" and atomically add it to the map.
// Only the thread that really added it establishes the channel.
@@ -79,18 +81,20 @@ class PartitionRequestClientFactory {
client = connectingChannel.waitForChannel();
clients.replace(remoteAddress, connectingChannel, client);
- } else if (old instanceof ConnectingChannel) {
+ }
+ else if (old instanceof ConnectingChannel) {
client = ((ConnectingChannel) old).waitForChannel();
clients.replace(remoteAddress, old, client);
- } else {
+ }
+ else {
client = (PartitionRequestClient) old;
}
}
// Make sure to increment the reference count before handing a client
// out to ensure correct bookkeeping for channel closing.
- if(!client.incrementReferenceCounter()){
+ if (!client.incrementReferenceCounter()) {
destroyPartitionRequestClient(remoteAddress, client);
client = null;
}
@@ -102,7 +106,7 @@ class PartitionRequestClientFactory {
public void closeOpenChannelConnections(RemoteAddress remoteAddress) {
Object entry = clients.get(remoteAddress);
- if(entry instanceof ConnectingChannel) {
+ if (entry instanceof ConnectingChannel) {
ConnectingChannel channel = (ConnectingChannel) entry;
if (channel.dispose()) {
@@ -141,8 +145,9 @@ class PartitionRequestClientFactory {
boolean result;
synchronized (connectLock) {
if (partitionRequestClient != null) {
- result = partitionRequestClient.disposeIfNotUsed();
- } else {
+ result = partitionRequestClient.disposeIfNotUsed();
+ }
+ else {
disposeRequestClient = true;
result = true;
}
@@ -155,16 +160,21 @@ class PartitionRequestClientFactory {
private void handInChannel(Channel channel) {
synchronized (connectLock) {
- PartitionRequestClientHandler requestHandler =
- (PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
+ try {
+ PartitionRequestClientHandler requestHandler =
+ (PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
- partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory);
+ partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory);
- if (disposeRequestClient) {
- partitionRequestClient.disposeIfNotUsed();
- }
+ if (disposeRequestClient) {
+ partitionRequestClient.disposeIfNotUsed();
+ }
- connectLock.notifyAll();
+ connectLock.notifyAll();
+ }
+ catch (Throwable t) {
+ notifyOfError(t);
+ }
}
}