You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/07 09:23:55 UTC

[GitHub] [kafka] itweixiang commented on a change in pull request #9902: KAFKA-12193: Re-resolve IPs after a client disconnects

itweixiang commented on a change in pull request #9902:
URL: https://github.com/apache/kafka/pull/9902#discussion_r763795681



##########
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##########
@@ -81,6 +85,19 @@
     private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest);
     private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
     private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery();
+    private ArrayList<InetAddress> initialAddresses = new ArrayList<>(Arrays.asList(

Review comment:
       hi bob-barrett , I hava a quetion about your issue ,can you help me ? 
   our kafka are deloying in k8s , where kafka cluster restart , result in producer and consumer will disconnect.
   kafka cluster restart will generate new ips,  but kafka client store old ips . so we must restart producer and consumer , so tired.
   
   in kafka client 2.8.1 version ,I see  org.apache.kafka.clients.NetworkClient#initiateConnect , `node.host()` maybe return a ip instead of domain.   if return a ip , will result in lose efficacy about your updated version .
   
   my english level is match awful , can you know my words?  
   
   ```
   private void initiateConnect(Node node, long now) {
           String nodeConnectionId = node.idString();
           try {
               connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
               InetAddress address = connectionStates.currentAddress(nodeConnectionId);
               log.debug("Initiating connection to node {} using address {}", node, address);
               selector.connect(nodeConnectionId,
                       new InetSocketAddress(address, node.port()),
                       this.socketSendBuffer,
                       this.socketReceiveBuffer);
           } catch (IOException e) {
               log.warn("Error connecting to node {}", node, e);
               // Attempt failed, we'll try again after the backoff
               connectionStates.disconnected(nodeConnectionId, now);
               // Notify metadata updater of the connection failure
               metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
           }
       }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org