You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/18 04:34:22 UTC

[GitHub] [kafka] gharris1727 opened a new pull request #9765: KAFKA-10763: Fix incomplete cooperative rebalances preventing connector/task revocations

gharris1727 opened a new pull request #9765:
URL: https://github.com/apache/kafka/pull/9765


   When two cooperative rebalances take place soon after one another, a prior rebalance may not complete before the next rebalance is started.
   Under Eager rebalancing, no tasks would have been started, so the subsequent onRevoked call is intentionally skipped whenever rebalanceResolved was false.
   Under Cooperative rebalancing, the same logic causes the DistributedHerder to skip stopping all of the connector/task revocations which occur in the second rebalance.
   The DistributedHerder still removes the revoked connectors/tasks from its assignment, so that the DistributedHerder and Worker have different knowledge of running connectors/tasks.
   This causes the connector/task instances that would have been stopped to disappear from the rebalance protocol, and left running until their workers are halted, or they fail.
   Connectors/Tasks which were then reassigned to other workers by the rebalance protocol would be duplicated, and run concurrently with zombie connectors/tasks.
   Connectors/Tasks which were reassigned back to the same worker would encounter exceptions in Worker, indicating that the connector/task existed and was already running.
   
   * Add a test for revoking and then reassigning a connector under normal circumstances
   * Add a test for revoking and then reassigning a connector following an incomplete cooperative rebalance
   * Change expectRebalance to make assignment fields mutable before passing them into the DistributedHerder
   * Only skip revocation for the Eager protocol, and never skip revocation for cooperative/sessioned protocols
   
   Signed-off-by: Greg Harris <gr...@confluent.io>
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #9765: KAFKA-10763: Fix incomplete cooperative rebalances preventing connector/task revocations

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #9765:
URL: https://github.com/apache/kafka/pull/9765#issuecomment-767727180


   jdk 8 and jdk 15 tests green. A single failure in jdk 11 is on a streams, possibly flaky, test. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine merged pull request #9765: KAFKA-10763: Fix incomplete cooperative rebalances preventing connector/task revocations

Posted by GitBox <gi...@apache.org>.
kkonstantine merged pull request #9765:
URL: https://github.com/apache/kafka/pull/9765


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #9765: KAFKA-10763: Fix incomplete cooperative rebalances preventing connector/task revocations

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on pull request #9765:
URL: https://github.com/apache/kafka/pull/9765#issuecomment-747863472


   Note: as-is, this PR does not reproduce the exact exceptions noted in the JIRA, as the Worker is mocked out for this test. I believe that it's sufficient to show that without this fix, the DistributedHerder::onRevoked does not stop the tasks. This assumes that if we don't stop the task in the Worker (and the task does not fail), that a subsequent task-start would cause the Worker to throw the exception found in the JIRA.
   
   cc @C0urante @ncliang PTAL, thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ncliang commented on a change in pull request #9765: KAFKA-10763: Fix incomplete cooperative rebalances preventing connector/task revocations

Posted by GitBox <gi...@apache.org>.
ncliang commented on a change in pull request #9765:
URL: https://github.com/apache/kafka/pull/9765#discussion_r546001555



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
##########
@@ -566,6 +566,112 @@ public Boolean answer() throws Throwable {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testRevoke() throws TimeoutException {
+        revokeAndReassign(false);
+    }
+
+    @Test
+    public void testIncompleteRebalanceBeforeRevoke() throws TimeoutException {
+        revokeAndReassign(true);
+    }
+
+    public void revokeAndReassign(boolean incompleteRebalance) throws TimeoutException {

Review comment:
       It's hard to tell if this actually reproduces the issue or not due to the heavy mocking required. Is there a more direct way to reproduce? Maybe in `RebalanceSourceConnectorsIntegrationTest` or similar? Even if the IT ends up being flaky, having that repro would boost confidence in this fix.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1740,7 +1741,7 @@ public void onRevoked(String leader, Collection<String> connectors, Collection<C
             // Note that since we don't reset the assignment, we don't revoke leadership here. During a rebalance,
             // it is still important to have a leader that can write configs, offsets, etc.
 
-            if (rebalanceResolved) {
+            if (rebalanceResolved || currentProtocolVersion >= CONNECT_PROTOCOL_V1) {

Review comment:
       Maybe add a comment explaining why the additional check is needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #9765: KAFKA-10763: Fix incomplete cooperative rebalances preventing connector/task revocations

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #9765:
URL: https://github.com/apache/kafka/pull/9765#discussion_r551448513



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
##########
@@ -566,6 +566,112 @@ public Boolean answer() throws Throwable {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testRevoke() throws TimeoutException {
+        revokeAndReassign(false);
+    }
+
+    @Test
+    public void testIncompleteRebalanceBeforeRevoke() throws TimeoutException {
+        revokeAndReassign(true);
+    }
+
+    public void revokeAndReassign(boolean incompleteRebalance) throws TimeoutException {

Review comment:
       Yeah, this test was a bit difficult to wrap my head around at first, but I think it's the best way to target this section of the code. I don't believe that adding a new flakey test is prudent, and making a non-flakey test with less mocks might end up to be harder to follow than this mocked test.
   
   I think what _would_ be a good test to add would be a variant which replaced this contrived reading-config-offset-topic-failure with a [genuine WakeupException thrown from the end of tick](https://github.com/apache/kafka/blob/ac7b5d3389fddf46bc53ab656de1fa7e2562efdb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L433-L443), which I believe is the true root cause of this issue most of the time. This is not easy with the boilerplate in this test as-is, and requires a little bit of refactoring to set up the rebalance during that block.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org