You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/22 01:36:04 UTC

[jira] [Commented] (KAFKA-3378) Client blocks forever if SocketChannel connects instantly

    [ https://issues.apache.org/jira/browse/KAFKA-3378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300824#comment-16300824 ] 

ASF GitHub Bot commented on KAFKA-3378:
---------------------------------------

ijuma closed pull request #1085: KAFKA-3378 Fix for instantly connecting SocketChannels (v3)
URL: https://github.com/apache/kafka/pull/1085
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8bb33488cb2..accfa40ecd5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -84,6 +85,7 @@
     private final List<Send> completedSends;
     private final List<NetworkReceive> completedReceives;
     private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
+    private final Set<KafkaChannel> connectableChannels;
     private final List<String> disconnected;
     private final List<String> connected;
     private final List<String> failedSends;
@@ -118,6 +120,7 @@ public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, T
         this.completedSends = new ArrayList<Send>();
         this.completedReceives = new ArrayList<NetworkReceive>();
         this.stagedReceives = new HashMap<KafkaChannel, Deque<NetworkReceive>>();
+        this.connectableChannels = new HashSet<>();
         this.connected = new ArrayList<String>();
         this.disconnected = new ArrayList<String>();
         this.failedSends = new ArrayList<String>();
@@ -161,8 +164,9 @@ public void connect(String id, InetSocketAddress address, int sendBufferSize, in
         if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
             socket.setReceiveBufferSize(receiveBufferSize);
         socket.setTcpNoDelay(true);
+        boolean connected;
         try {
-            socketChannel.connect(address);
+            connected = socketChannel.connect(address);
         } catch (UnresolvedAddressException e) {
             socketChannel.close();
             throw new IOException("Can't resolve address: " + address, e);
@@ -174,6 +178,14 @@ public void connect(String id, InetSocketAddress address, int sendBufferSize, in
         KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
         key.attach(channel);
         this.channels.put(id, channel);
+        
+        if (connected) {
+            // Since the channel is already connected OP_CONNECT won't trigger
+            // Add to connectableChannels so poll() will be able to handle this case
+            log.debug("Instantly connected to node {}", channel.id());
+            connectableChannels.add(channel);
+            key.interestOps(0);
+        }
     }
 
     /**
@@ -259,7 +271,7 @@ public void poll(long timeout) throws IOException {
         if (timeout < 0)
             throw new IllegalArgumentException("timeout should be >= 0");
         clear();
-        if (hasStagedReceives())
+        if (hasStagedReceives() || connectableChannels.size() > 0)
             timeout = 0;
         /* check ready keys */
         long startSelect = time.nanoseconds();
@@ -268,6 +280,13 @@ public void poll(long timeout) throws IOException {
         currentTimeNanos = endSelect;
         this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
 
+        // Perform setup for instantly connected channels since they won't be included in selectedKeys()
+        if (connectableChannels.size() > 0)
+            for (KafkaChannel channel : connectableChannels) {
+                sensors.maybeRegisterConnectionMetrics(channel.id());
+                lruConnections.put(channel.id(), currentTimeNanos);
+            }
+        
         if (readyKeys > 0) {
             Set<SelectionKey> keys = this.nioSelector.selectedKeys();
             Iterator<SelectionKey> iter = keys.iterator();
@@ -281,11 +300,10 @@ public void poll(long timeout) throws IOException {
                 lruConnections.put(channel.id(), currentTimeNanos);
 
                 try {
-                    /* complete any connections that have finished their handshake */
+                    /* queue for completion any connections that have finished their handshake */
                     if (key.isConnectable()) {
-                        channel.finishConnect();
-                        this.connected.add(channel.id());
-                        this.sensors.connectionCreated.record();
+                        connectableChannels.add(channel);
+                        continue;
                     }
 
                     /* if channel is not ready finish prepare */
@@ -327,6 +345,17 @@ public void poll(long timeout) throws IOException {
 
         addToCompletedReceives();
 
+        // finish connection setup
+        if (connectableChannels.size() > 0) {
+            for (KafkaChannel channel : connectableChannels) {
+                channel.finishConnect();
+                this.connected.add(channel.id());
+                this.sensors.connectionCreated.record();
+                channel.prepare();
+            }
+            connectableChannels.clear();
+        }
+        
         long endIo = time.nanoseconds();
         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
         maybeCloseOldestConnection();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Client blocks forever if SocketChannel connects instantly
> ---------------------------------------------------------
>
>                 Key: KAFKA-3378
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3378
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1
>            Reporter: Larkin Lowrey
>            Assignee: Larkin Lowrey
>            Priority: Blocker
>             Fix For: 0.10.0.0
>
>
> Observed that some consumers were blocked in Fetcher.listOffset() when starting many dozens of consumer threads at the same time.
> Selector.connect(...) calls SocketChannel.connect() in non-blocking mode and assumes that false is always returned and that the channel will be in the Selector's readyKeys once the connection is ready for connect completion due to the OP_CONNECT interest op.
> When connect() returns true the channel is fully connected connected and will not be included in readyKeys since only OP_CONNECT is set.
> I implemented a fix which handles the case when connect(...) returns true and verified that I no longer see stuck consumers. A git pull request will be forthcoming.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)