You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/13 15:07:22 UTC

[GitHub] [kafka] fvaleri opened a new pull request, #12159: Fix stuck SSL tests in case of authentication failure

fvaleri opened a new pull request, #12159:
URL: https://github.com/apache/kafka/pull/12159

   When there is an authentication error after the initial TCP connection,
   the selector never becomes READY, and these tests wait forever waiting for this state.
   
   This is actually what happened to me while using an OpenJDK build that does not
   support the required cipher suites.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r890306188


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Hi @ijuma, thanks for looking into this.
   
   You are right, I think the only missing assertion is the following.
   
   ```
   assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
   ```
   
   If disconnected() is not empty (i.e. the client closed the connection), then we would exit the while loop prematurely and the first assertion would fail. 
   
   I can add it back with a new PR if you think it's important.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r890306188


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Hi @ijuma, thanks for looking into this.
   
   You are right, I think the only missing check is the following.
   
   ```
   assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
   ```
   
   I can add it back with a new PR if you think it's important.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: MINOR: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r880201978


##########
clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java:
##########
@@ -87,6 +88,16 @@ public static void checkClientConnection(Selector selector, String node, int min
         }
     }
 
+    public static void waitForChannelConnected(Selector selector, String node) throws IOException {
+        int secondsLeft = 30;
+        while (selector.channel(node) != null
+                && !selector.channel(node).isConnected() && secondsLeft-- > 0) {
+            selector.poll(1000L);
+        }
+        assertNotNull(selector.channel(node));
+        assertTrue(selector.channel(node).isConnected());

Review Comment:
   Agreed, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r881556147


##########
clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java:
##########
@@ -87,13 +88,23 @@ public static void checkClientConnection(Selector selector, String node, int min
         }
     }
 
+    public static void waitForChannelConnected(Selector selector, String node) throws IOException {
+        int secondsLeft = 30;
+        while (selector.channel(node) != null
+                && !selector.channel(node).isConnected() && secondsLeft-- > 0) {
+            selector.poll(1000L);
+        }
+        assertNotNull(selector.channel(node));
+        assertTrue(selector.channel(node).isConnected(), String.format("Channel %s is not connected after %d seconds", node, secondsLeft));

Review Comment:
   Right, thanks for spotting this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r880476043


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   You're right! Make sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r890306188


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Hi @ijuma, thanks for looking into this.
   
   You are right, I think the only missing assertion is the following.
   
   ```
   assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
   ```
   
   If disconnected() is not empty, then we would exit the while loop prematurely and the first assertion would fail. Maybe we could simply add a message which also includes the channel's state to the first assertion after the while loop.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r873646603


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -449,9 +450,9 @@ public void close() {
     @Test
     public void testCloseOldestConnection() throws Exception {
         String id = "0";
-        blockingConnect(id);
-
-        time.sleep(6000); // The max idle time is 5000ms
+        selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        selector.poll(0);

Review Comment:
   Note that `connect()` is synchronous method here. It might not complete with the first poll event. That is the reason it is tested in a while loop inside `blockingConnect`.
   
   My suggestion would be to keep using blockingConnect with a (new) timeout to terminate while loop in it similar to implementation of `NetworkTestUtils.waitForChannelReady`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r874001986


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -449,9 +450,9 @@ public void close() {
     @Test
     public void testCloseOldestConnection() throws Exception {
         String id = "0";
-        blockingConnect(id);
-
-        time.sleep(6000); // The max idle time is 5000ms
+        selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        selector.poll(0);

Review Comment:
   Thanks @divijvaidya for looking at this. I need to fix this and `testCloseOldestConnectionWithMultiplePendingReceives`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r890306188


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Hi @ijuma, thanks for looking into this.
   
   You are right, I think the only missing assertion is the following.
   
   ```
   assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
   ```
   
   I can add it back with a new PR if you think it's important.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r890306188


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Hi @ijuma, thanks for looking into this.
   
   You are right, I think the only missing assertion is the following.
   
   ```
   assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
   ```
   
   If disconnected() is not empty, then we would exit the while loop prematurely and the first assertion would fail. 
   
   Maybe we could simply add a message which also includes the channel state to the first assertion after the while loop.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r889746624


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Can you elaborate why this is so? The assertions here are different from the ones after the while loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #12159: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on PR #12159:
URL: https://github.com/apache/kafka/pull/12159#issuecomment-1132744614

   All selector's tests are passing. Failing tests seem to be unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #12159: [WIP] Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on PR #12159:
URL: https://github.com/apache/kafka/pull/12159#issuecomment-1131936314

   @divijvaidya I fixed the test you were referring to.
   
   I also fixed the help method `sendNoReceive` which was directly using `channel.mute()` instead of `selector.mute(channel.id())`. The first call does not ensure proper state handling, and I believe it's the reason why `testCloseOldestConnectionWithMultiplePendingReceives` was randomly failing.
   
   Let's see if the test job pass this time.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12159:
URL: https://github.com/apache/kafka/pull/12159#issuecomment-1144607127

   Failed tests are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / kafka.admin.TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft
       Build / JDK 17 and Scala 2.13 / kafka.controller.ControllerIntegrationTest.testPartitionReassignmentToBrokerWithOfflineLogDir()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r890306188


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Hi @ijuma, thanks for looking into this.
   
   You are right, I think the only missing assertion is the following.
   
   ```
   assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
   ```
   
   If disconnected() is not empty (i.e. the client closed the connection), then we would exit the while loop prematurely and the first assertion would fail. 
   I can add it back if you think it's important.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r890306188


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Hi @ijuma, thanks for looking into this.
   
   You are right, I think the only missing assertion is the following.
   
   ```
   assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
   ```
   
   If disconnected() is not empty, then we would exit the while loop prematurely and the first assertion would fail. Maybe we could simply add a message which also includes the channel state to the first assertion after the while loop.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: MINOR: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r880201691


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   They are redundant IMO. 
   
   If any of these fails, you would catch it in the final assertions.



##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1138,20 +1130,18 @@ private KafkaChannel createConnectionWithPendingReceives(int pendingReceives) th
     }
 
     /**
-     * Sends the specified number of requests and waits for the requests to be sent. The channel
-     * is muted during polling to ensure that incoming data is not received.
+     * Sends the specified number of requests and waits for the requests to be sent.
+     * The channel is muted during polling to ensure that incoming data is not received.
      */
-    private KafkaChannel sendNoReceive(KafkaChannel channel, int numRequests) throws Exception {
-        channel.mute();
+    private void sendNoReceive(KafkaChannel channel, int numRequests) throws Exception {
+        selector.mute(channel.id());
         for (int i = 0; i < numRequests; i++) {
             selector.send(createSend(channel.id(), String.valueOf(i)));
             do {
                 selector.poll(10);
             } while (selector.completedSends().isEmpty());
         }
-        channel.maybeUnmute();
-
-        return channel;
+        selector.unmute(channel.id());

Review Comment:
   This is not just a cleanup, but mainly a bug fix. 
   
   Using channel's mute/unmute rather than selector's mute/unmute does not ensure proper selector state handling, which causes random errors when a test rely on that state.



##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   Let me add the KafkaChannel's state machine diagram for clarity. 
   
   Given that the TCP connection is already established after connect completes successfully (state: READY|AUTHENTICATE), I don't think it is necessary to wait for channel readiness, which would unnecessary delay this test in case of TLS/SASL authentications. Instead, it is necessary for all the other tests on this class.
   
   ![kafka-channel-states](https://user-images.githubusercontent.com/11456498/169981699-f0e7fc6c-985b-407a-80c7-8294643f40f8.jpg)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r874001986


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -449,9 +450,9 @@ public void close() {
     @Test
     public void testCloseOldestConnection() throws Exception {
         String id = "0";
-        blockingConnect(id);
-
-        time.sleep(6000); // The max idle time is 5000ms
+        selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        selector.poll(0);

Review Comment:
   Hi @divijvaidya, thanks for looking at this. I need to fix this and `testCloseOldestConnectionWithMultiplePendingReceives`.



##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -449,9 +450,9 @@ public void close() {
     @Test
     public void testCloseOldestConnection() throws Exception {
         String id = "0";
-        blockingConnect(id);
-
-        time.sleep(6000); // The max idle time is 5000ms
+        selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        selector.poll(0);

Review Comment:
   Hi @divijvaidya, thanks for looking at this. 
   
   I need to fix this and `testCloseOldestConnectionWithMultiplePendingReceives`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
showuon merged PR #12159:
URL: https://github.com/apache/kafka/pull/12159


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r881564718


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   @showuon I probably expressed that badly. I added `NetworkTestUtils.waitForChannelConnected()` for tests that don't need to use the channel and don't care about the current state. 
   
   In `SelectorTest.testCloseOldestConnection()` we test idle connection expiration and the channel can be in every state, so there is no need to wait for readiness (the initial TCP connection is enough).
   
   @tombentley agreed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12159:
URL: https://github.com/apache/kafka/pull/12159#issuecomment-1144607930

   @tombentley , do you want to have another look? I'm going to merge this PR next week if you don't have any other comments. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r890306188


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Hi @ijuma, thanks for looking into this.
   
   You are right, I think the only missing assertion is the following.
   
   ```
   assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
   ```
   
   If disconnected() is not empty, then we would exit the while loop prematurely and the first assertion would fail. 
   
   Maybe we could simply add a message which also includes the channel's state to the first assertion after the while loop.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r874001986


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -449,9 +450,9 @@ public void close() {
     @Test
     public void testCloseOldestConnection() throws Exception {
         String id = "0";
-        blockingConnect(id);
-
-        time.sleep(6000); // The max idle time is 5000ms
+        selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        selector.poll(0);

Review Comment:
   Hi @divijvaidya, thanks. 
   
   I need to fix this and `testCloseOldestConnectionWithMultiplePendingReceives`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12159: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r880005058


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1138,20 +1130,18 @@ private KafkaChannel createConnectionWithPendingReceives(int pendingReceives) th
     }
 
     /**
-     * Sends the specified number of requests and waits for the requests to be sent. The channel
-     * is muted during polling to ensure that incoming data is not received.
+     * Sends the specified number of requests and waits for the requests to be sent.
+     * The channel is muted during polling to ensure that incoming data is not received.
      */
-    private KafkaChannel sendNoReceive(KafkaChannel channel, int numRequests) throws Exception {
-        channel.mute();
+    private void sendNoReceive(KafkaChannel channel, int numRequests) throws Exception {
+        selector.mute(channel.id());
         for (int i = 0; i < numRequests; i++) {
             selector.send(createSend(channel.id(), String.valueOf(i)));
             do {
                 selector.poll(10);
             } while (selector.completedSends().isEmpty());
         }
-        channel.maybeUnmute();
-
-        return channel;
+        selector.unmute(channel.id());

Review Comment:
   nice clean up.



##########
clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java:
##########
@@ -87,6 +88,16 @@ public static void checkClientConnection(Selector selector, String node, int min
         }
     }
 
+    public static void waitForChannelConnected(Selector selector, String node) throws IOException {
+        int secondsLeft = 30;
+        while (selector.channel(node) != null
+                && !selector.channel(node).isConnected() && secondsLeft-- > 0) {
+            selector.poll(1000L);
+        }
+        assertNotNull(selector.channel(node));
+        assertTrue(selector.channel(node).isConnected());

Review Comment:
   I'll add error message in the assertion, to indicate what we expected, ex:
   `assertTrue(selector.channel(node).isConnected(), "channel " + node + " is not connected after 30 secs");`
   
   Same to below `waitForChannelReady` method. (I know it's not your change, but let's improve it together)



##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Why did we remove these assetions?



##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   I think we should replace the 2 while loop with:
   `waitForChannelConnected` + `waitForChannelReady`.
   This way, we can make sure the connection is done or not when exception thrown during `waitForChannelReady`, right?
   
   Same comments to below similar replacement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r881564718


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   @showuon I probably expressed that badly. I added `NetworkTestUtils.waitForChannelConnected()` for tests that don't need to use the channel and don't care about the current state. 
   
   For example, in `SelectorTest.testCloseOldestConnection()` we test idle connection expiration and the channel can be in every state, so there is no need to wait for readiness (the initial TCP connection is enough).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r881564718


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   @showuon I probably expressed that badly. I added `NetworkTestUtils.waitForChannelConnected()` for tests that don't need to use the channel and don't care about the current state, like `SelectorTest.testCloseOldestConnection()` (we can test idle connection expiration from every state).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tombentley commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
tombentley commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r881387146


##########
clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java:
##########
@@ -87,13 +88,23 @@ public static void checkClientConnection(Selector selector, String node, int min
         }
     }
 
+    public static void waitForChannelConnected(Selector selector, String node) throws IOException {
+        int secondsLeft = 30;
+        while (selector.channel(node) != null
+                && !selector.channel(node).isConnected() && secondsLeft-- > 0) {
+            selector.poll(1000L);
+        }
+        assertNotNull(selector.channel(node));
+        assertTrue(selector.channel(node).isConnected(), String.format("Channel %s is not connected after %d seconds", node, secondsLeft));

Review Comment:
   When the assertion fails `secondsLeft` will be zero at this point (since you decremented it in the loop) resulting in a confusing assertion failure message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r881564718


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   @showuon I probably expressed that badly. I added `NetworkTestUtils.waitForChannelConnected()` for tests that don't need to use the channel, like `SelectorTest.testCloseOldestConnection()`, where we test for idle connection expiration, so there is no need to wait for readiness.
   
   @tombentley agreed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tombentley commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
tombentley commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r881437895


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   The question is whether this test should assume the correctness of the implementation of that state machine in KafkaChannel. It's probably a safe assumption, but I'd be inclined to not make that assumption. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r881564718


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   @showuon I probably expressed that badly. I added `NetworkTestUtils.waitForChannelConnected()` for tests that don't need to use the channel and don't care about the current state. 
   
   For example, in `SelectorTest.testCloseOldestConnection()` we test idle connection expiration and the channel can be in every state, so there is no need to wait for readiness (the initial TCP connection is enough).
   
   @tombentley agreed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r881564718


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   @showuon I probably expressed that badly. I added `NetworkTestUtils.waitForChannelConnected()` for tests that don't need to use the channel and don't care about the current channel state. 
   
   In `SelectorTest.testCloseOldestConnection()` we test idle connection expiration and the channel can be in every state, so there is no need to wait for readiness (the initial TCP connection is enough).
   
   @tombentley agreed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r881564718


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   @showuon I probably expressed that badly. I added `NetworkTestUtils.waitForChannelConnected()` for tests that don't need to use the channel and don't care about the current channel state. 
   
   In `SelectorTest.testCloseOldestConnection()` we test idle connection expiration and the channel can be in every state, so there is no need to wait for readiness.
   
   @tombentley agreed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #12159: KAFKA-13933: Fix stuck SSL tests in case of authentication failure

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r890306188


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
         int expectedReceives = 5;
         KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have been expired");
-                assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
-            }

Review Comment:
   Hi @ijuma, thanks for looking into this.
   
   You are right, I think the only missing assertion is the following.
   
   ```
   assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
   ```
   
   If disconnected() is not empty, then we would exit the while loop prematurely and the first assertion would fail.
   In this case, the error message would not help in identifying the root cause, so I can add it back if you think it's important.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org