You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/07 09:54:45 UTC

[GitHub] [kafka] dajac opened a new pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

dajac opened a new pull request #8990:
URL: https://github.com/apache/kafka/pull/8990


   This PR fixes a bug introduced in https://github.com/apache/kafka/pull/8683/.
   
   While processing connection set up timeouts, we are iterating through the connecting nodes to process timeouts and we disconnect within the loop, removing the entry from the set in the loop that it iterating over the set. That raises an `ConcurrentModificationException` exception. The current unit test did not catch this because it was using only one node.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#issuecomment-654954129


   @dajac Thanks for the fix - wow, green build, haven't seen that in a while! Merging to trunk.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#discussion_r450756672



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -440,6 +441,20 @@ public boolean isConnectionSetupTimeout(String id, long now) {
         return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);
     }
 
+    /**
+     * Return the Set of nodes whose connection setup has timed out.
+     * @param now the current time in ms
+     */
+    public Set<String> nodesWithConnectionSetupTimeout(long now) {
+        Set<String> nodes = new HashSet<>();
+        for (String nodeId : connectingNodes) {

Review comment:
       Sure thing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#discussion_r450752852



##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
                 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
         assertTrue(connectionStates.connectingNodes().contains(nodeId1));
     }
+
+    @Test
+    public void testTimedOutConnections() {
+        // Initiate two connections
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+
+        // Expect no timed out connections
+        assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());

Review comment:
       nit: `assertEquals(0, ...)` may be better here so we know how many in the case of failure?

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -440,6 +441,20 @@ public boolean isConnectionSetupTimeout(String id, long now) {
         return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);
     }
 
+    /**
+     * Return the Set of nodes whose connection setup has timed out.
+     * @param now the current time in ms
+     */
+    public Set<String> nodesWithConnectionSetupTimeout(long now) {
+        Set<String> nodes = new HashSet<>();
+        for (String nodeId : connectingNodes) {

Review comment:
       We can use `connectingNodes.stream().filter`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
                 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
         assertTrue(connectionStates.connectingNodes().contains(nodeId1));
     }
+
+    @Test
+    public void testTimedOutConnections() {
+        // Initiate two connections
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+
+        // Expect no timed out connections
+        assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());
+
+        // Advance time by half of the connection setup timeout
+        time.sleep(connectionSetupTimeoutMs / 2);
+
+        // Initiate a third connections

Review comment:
       nit: connections => connection

##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
                 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
         assertTrue(connectionStates.connectingNodes().contains(nodeId1));
     }
+
+    @Test
+    public void testTimedOutConnections() {
+        // Initiate two connections
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+
+        // Expect no timed out connections
+        assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());
+
+        // Advance time by half of the connection setup timeout
+        time.sleep(connectionSetupTimeoutMs / 2);
+
+        // Initiate a third connections
+        connectionStates.connecting(nodeId3, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+
+        // Advance time beyond the connection setup timeout (+ max jitter) for the first two connections
+        time.sleep((long) (connectionSetupTimeoutMs / 2 + connectionSetupTimeoutMs * connectionSetupTimeoutJitter));
+
+        // Expect two timed out connections.
+        Set<String> timedOutConnections = connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds());
+        assertEquals(2, timedOutConnections.size());
+        assertTrue(timedOutConnections.contains(nodeId1));
+        assertTrue(timedOutConnections.contains(nodeId2));
+
+        // Disconnect the first two connections
+        connectionStates.disconnected(nodeId1, time.milliseconds());
+        connectionStates.disconnected(nodeId2, time.milliseconds());
+
+        // Advance time beyond the connection setup timeout (+ max jitter) for for the third connections

Review comment:
       typo: `for for`

##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
                 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
         assertTrue(connectionStates.connectingNodes().contains(nodeId1));
     }
+
+    @Test
+    public void testTimedOutConnections() {
+        // Initiate two connections
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+
+        // Expect no timed out connections
+        assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());
+
+        // Advance time by half of the connection setup timeout
+        time.sleep(connectionSetupTimeoutMs / 2);
+
+        // Initiate a third connections
+        connectionStates.connecting(nodeId3, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+
+        // Advance time beyond the connection setup timeout (+ max jitter) for the first two connections
+        time.sleep((long) (connectionSetupTimeoutMs / 2 + connectionSetupTimeoutMs * connectionSetupTimeoutJitter));
+
+        // Expect two timed out connections.
+        Set<String> timedOutConnections = connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds());
+        assertEquals(2, timedOutConnections.size());
+        assertTrue(timedOutConnections.contains(nodeId1));
+        assertTrue(timedOutConnections.contains(nodeId2));
+
+        // Disconnect the first two connections
+        connectionStates.disconnected(nodeId1, time.milliseconds());
+        connectionStates.disconnected(nodeId2, time.milliseconds());
+
+        // Advance time beyond the connection setup timeout (+ max jitter) for for the third connections
+        time.sleep((long) (connectionSetupTimeoutMs / 2 + connectionSetupTimeoutMs * connectionSetupTimeoutJitter));
+
+        // Expect two timed out connections.

Review comment:
       one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#issuecomment-654748250


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#issuecomment-654860922


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#issuecomment-654755355


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #8990:
URL: https://github.com/apache/kafka/pull/8990


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#discussion_r451059876



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -440,6 +442,16 @@ public boolean isConnectionSetupTimeout(String id, long now) {
         return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);
     }
 
+    /**
+     * Return the Set of nodes whose connection setup has timed out.
+     * @param now the current time in ms
+     */
+    public Set<String> nodesWithConnectionSetupTimeout(long now) {

Review comment:
       Do we need a `Set` here? Not sure we need to pay the cost of creating the `HashSet`, etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#issuecomment-654752598


   @rajinisivaram Thanks for the review. I just pushed an update that addresses your feedback.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org