You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/03 10:37:20 UTC

[2/3] flink git commit: [distributed runtime] Notify about error when handing in channel

[distributed runtime] Notify about error when handing in channel


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6da093a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6da093a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6da093a6

Branch: refs/heads/master
Commit: 6da093a659180dadf6a2b6a56d10d88fbd19ed8c
Parents: 256b2ee
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Mar 2 20:01:04 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Mar 3 10:36:56 2015 +0100

----------------------------------------------------------------------
 .../netty/PartitionRequestClientFactory.java    | 42 ++++++++++++--------
 1 file changed, 26 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6da093a6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index d64548d..d7e6efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -52,20 +52,22 @@ class PartitionRequestClientFactory {
 		Object entry;
 		PartitionRequestClient client = null;
 
-		while(client == null) {
+		while (client == null) {
 			entry = clients.get(remoteAddress);
 
 			if (entry != null) {
 				// Existing channel or connecting channel
 				if (entry instanceof PartitionRequestClient) {
 					client = (PartitionRequestClient) entry;
-				} else {
+				}
+				else {
 					ConnectingChannel future = (ConnectingChannel) entry;
 					client = future.waitForChannel();
 
 					clients.replace(remoteAddress, future, client);
 				}
-			} else {
+			}
+			else {
 				// No channel yet. Create one, but watch out for a race.
 				// We create a "connecting future" and atomically add it to the map.
 				// Only the thread that really added it establishes the channel.
@@ -79,18 +81,20 @@ class PartitionRequestClientFactory {
 					client = connectingChannel.waitForChannel();
 
 					clients.replace(remoteAddress, connectingChannel, client);
-				} else if (old instanceof ConnectingChannel) {
+				}
+				else if (old instanceof ConnectingChannel) {
 					client = ((ConnectingChannel) old).waitForChannel();
 
 					clients.replace(remoteAddress, old, client);
-				} else {
+				}
+				else {
 					client = (PartitionRequestClient) old;
 				}
 			}
 
 			// Make sure to increment the reference count before handing a client
 			// out to ensure correct bookkeeping for channel closing.
-			if(!client.incrementReferenceCounter()){
+			if (!client.incrementReferenceCounter()) {
 				destroyPartitionRequestClient(remoteAddress, client);
 				client = null;
 			}
@@ -102,7 +106,7 @@ class PartitionRequestClientFactory {
 	public void closeOpenChannelConnections(RemoteAddress remoteAddress) {
 		Object entry = clients.get(remoteAddress);
 
-		if(entry instanceof ConnectingChannel) {
+		if (entry instanceof ConnectingChannel) {
 			ConnectingChannel channel = (ConnectingChannel) entry;
 
 			if (channel.dispose()) {
@@ -141,8 +145,9 @@ class PartitionRequestClientFactory {
 			boolean result;
 			synchronized (connectLock) {
 				if (partitionRequestClient != null) {
-					result =  partitionRequestClient.disposeIfNotUsed();
-				} else {
+					result = partitionRequestClient.disposeIfNotUsed();
+				}
+				else {
 					disposeRequestClient = true;
 					result = true;
 				}
@@ -155,16 +160,21 @@ class PartitionRequestClientFactory {
 
 		private void handInChannel(Channel channel) {
 			synchronized (connectLock) {
-				PartitionRequestClientHandler requestHandler =
-						(PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
+				try {
+					PartitionRequestClientHandler requestHandler =
+							(PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
 
-				partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory);
+					partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory);
 
-				if (disposeRequestClient) {
-					partitionRequestClient.disposeIfNotUsed();
-				}
+					if (disposeRequestClient) {
+						partitionRequestClient.disposeIfNotUsed();
+					}
 
-				connectLock.notifyAll();
+					connectLock.notifyAll();
+				}
+				catch (Throwable t) {
+					notifyOfError(t);
+				}
 			}
 		}