You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/24 08:16:52 UTC

[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: MINOR: Fix stuck SSL tests in case of authentication failure

fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r880201691


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   They are redundant IMO. 
   
   If any of these fails, you would catch it in the final assertions.



##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1138,20 +1130,18 @@ private KafkaChannel createConnectionWithPendingReceives(int pendingReceives) th
     }
 
     /**
-     * Sends the specified number of requests and waits for the requests to be sent. The channel
-     * is muted during polling to ensure that incoming data is not received.
+     * Sends the specified number of requests and waits for the requests to be sent.
+     * The channel is muted during polling to ensure that incoming data is not received.
      */
-    private KafkaChannel sendNoReceive(KafkaChannel channel, int numRequests) throws Exception {
-        channel.mute();
+    private void sendNoReceive(KafkaChannel channel, int numRequests) throws Exception {
+        selector.mute(channel.id());
         for (int i = 0; i < numRequests; i++) {
             selector.send(createSend(channel.id(), String.valueOf(i)));
             do {
                 selector.poll(10);
             } while (selector.completedSends().isEmpty());
         }
-        channel.maybeUnmute();
-
-        return channel;
+        selector.unmute(channel.id());

Review Comment:
   This is not just a cleanup, but mainly a bug fix. 
   
   Using channel's mute/unmute rather than selector's mute/unmute does not ensure proper selector state handling, which causes random errors when a test rely on that state.



##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   Let me add the KafkaChannel's state machine diagram for clarity. 
   
   Given that the TCP connection is already established after connect completes successfully (state: READY|AUTHENTICATE), I don't think it is necessary to wait for channel readiness, which would unnecessary delay this test in case of TLS/SASL authentications. Instead, it is necessary for all the other tests on this class.
   
   ![kafka-channel-states](https://user-images.githubusercontent.com/11456498/169981699-f0e7fc6c-985b-407a-80c7-8294643f40f8.jpg)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org