You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/08/12 08:29:08 UTC

[flink] branch release-1.11 updated (06d5ce5 -> ded539e)

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 06d5ce5  [FLINK-18682][orc][hive] Vector orc reader cannot read Hive 2.0.0 table
     new a86c430  [hotfix][tests] Unignore PartitionRequestClientFactoryTest class
     new ded539e  [FLINK-18821][network] Fix indefinite wait in PartitionRequestClientFactory.createPartitionRequestClient

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../netty/PartitionRequestClientFactory.java       | 10 +++++++-
 .../netty/PartitionRequestClientFactoryTest.java   | 29 +++++++++++++++++++++-
 2 files changed, 37 insertions(+), 2 deletions(-)


[flink] 01/02: [hotfix][tests] Unignore PartitionRequestClientFactoryTest class

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a86c430afee10d4f5445d3693bc0f1c83e863aa0
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Aug 10 15:41:23 2020 +0200

    [hotfix][tests] Unignore PartitionRequestClientFactoryTest class
---
 .../runtime/io/network/netty/PartitionRequestClientFactoryTest.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index d971634..e2bcc23 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -45,12 +45,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-@Ignore
 public class PartitionRequestClientFactoryTest {
 
 	private final static int SERVER_PORT = NetUtils.getAvailablePort();
 
 	@Test
+	@Ignore
 	public void testResourceReleaseAfterInterruptedConnect() throws Exception {
 
 		// Latch to synchronize on the connect call.


[flink] 02/02: [FLINK-18821][network] Fix indefinite wait in PartitionRequestClientFactory.createPartitionRequestClient

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ded539e85444f9c8d34f88da31784775ed281aba
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Aug 10 15:44:22 2020 +0200

    [FLINK-18821][network] Fix indefinite wait in PartitionRequestClientFactory.createPartitionRequestClient
---
 .../netty/PartitionRequestClientFactory.java       | 10 +++++++-
 .../netty/PartitionRequestClientFactoryTest.java   | 27 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 229121e..4f65c04 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -81,7 +81,15 @@ class PartitionRequestClientFactory {
 				Object old = clients.putIfAbsent(connectionId, connectingChannel);
 
 				if (old == null) {
-					nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
+					ChannelFuture channelFuture;
+					try {
+						channelFuture = nettyClient.connect(connectionId.getAddress());
+					} catch (Exception e) {
+						// https://issues.apache.org/jira/browse/FLINK-18821
+						connectingChannel.notifyOfError(e);
+						throw new IOException("Connecting the channel failed: " + e.getMessage(), e);
+					}
+					channelFuture.addListener(connectingChannel);
 
 					client = connectingChannel.waitForChannel();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index e2bcc23..0038908 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.util.NetUtils;
 
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
@@ -49,6 +51,18 @@ public class PartitionRequestClientFactoryTest {
 
 	private final static int SERVER_PORT = NetUtils.getAvailablePort();
 
+	// see https://issues.apache.org/jira/browse/FLINK-18821
+	@Test(expected = IOException.class)
+	public void testFailureReportedToSubsequentRequests() throws Exception {
+		PartitionRequestClientFactory factory = new PartitionRequestClientFactory(new FailingNettyClient());
+		try {
+			factory.createPartitionRequestClient(new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
+		} catch (IOException e) {
+			// expected
+		}
+		factory.createPartitionRequestClient(new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
+	}
+
 	@Test
 	@Ignore
 	public void testResourceReleaseAfterInterruptedConnect() throws Exception {
@@ -189,4 +203,17 @@ public class PartitionRequestClientFactoryTest {
 	private static ConnectionID createServerConnectionID(int connectionIndex) throws UnknownHostException {
 		return new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), SERVER_PORT), connectionIndex);
 	}
+
+	private static class FailingNettyClient extends NettyClient {
+
+		public FailingNettyClient() {
+			super(null);
+		}
+
+		@Override
+		ChannelFuture connect(final InetSocketAddress serverSocketAddress) {
+			throw new ChannelException("Simulate connect failure");
+		}
+	}
+
 }