You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "yashmayya (via GitHub)" <gi...@apache.org> on 2023/05/23 16:37:11 UTC

[GitHub] [kafka] yashmayya opened a new pull request, #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

yashmayya opened a new pull request, #13750:
URL: https://github.com/apache/kafka/pull/13750

   - Handle the config topic read timeout edge case in `DistributedHerder::stopConnector`
   - https://github.com/apache/kafka/pull/13465#discussion_r1200500990
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13750:
URL: https://github.com/apache/kafka/pull/13750#discussion_r1216967433


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1104,7 +1104,9 @@ public void stopConnector(final String connName, final Callback<Void> callback)
                     writeTaskConfigs(connName, Collections.emptyList());
                     configBackingStore.putTargetState(connName, TargetState.STOPPED);
                     // Force a read of the new target state for the connector
-                    refreshConfigSnapshot(workerSyncTimeoutMs);
+                    if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
+                        throw new ConnectException("Failed to read to end of config topic");

Review Comment:
   Yeah, that does seem to make sense. So, for the stopped state for instance, we'd refresh the config topic view before writing task configs (to ensure that the connector isn't in a stopped state) or resetting connector offsets instead of doing so after writing the stopped target state itself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13750:
URL: https://github.com/apache/kafka/pull/13750#discussion_r1218206159


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1104,7 +1104,9 @@ public void stopConnector(final String connName, final Callback<Void> callback)
                     writeTaskConfigs(connName, Collections.emptyList());
                     configBackingStore.putTargetState(connName, TargetState.STOPPED);
                     // Force a read of the new target state for the connector
-                    refreshConfigSnapshot(workerSyncTimeoutMs);
+                    if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
+                        throw new ConnectException("Failed to read to end of config topic");

Review Comment:
   Yep, exactly! Again though, still not sure if this is worth implementing--just figured it might be worth sharing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13750:
URL: https://github.com/apache/kafka/pull/13750#issuecomment-1576987913

   Similar to https://github.com/apache/kafka/pull/13808#issuecomment-1576833452 and https://github.com/apache/kafka/pull/13810#issuecomment-1576871102, I've verified locally that this builds correctly and, given the extremely limited scope of the change, feel comfortable merging without waiting for CI first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13750:
URL: https://github.com/apache/kafka/pull/13750#discussion_r1204446383


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1104,7 +1104,9 @@ public void stopConnector(final String connName, final Callback<Void> callback)
                     writeTaskConfigs(connName, Collections.emptyList());
                     configBackingStore.putTargetState(connName, TargetState.STOPPED);
                     // Force a read of the new target state for the connector
-                    refreshConfigSnapshot(workerSyncTimeoutMs);
+                    if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
+                        throw new ConnectException("Failed to read to end of config topic");

Review Comment:
   Hmm on second thought it might be better to just log a warning instead. Throwing an exception here will cause a 500 response to be returned to the `PUT /stop` API call even though both the empty set of task configs and the stopped target state have been produced to the config topic successfully (they're both synchronous writes).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13750:
URL: https://github.com/apache/kafka/pull/13750#discussion_r1204446383


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1104,7 +1104,9 @@ public void stopConnector(final String connName, final Callback<Void> callback)
                     writeTaskConfigs(connName, Collections.emptyList());
                     configBackingStore.putTargetState(connName, TargetState.STOPPED);
                     // Force a read of the new target state for the connector
-                    refreshConfigSnapshot(workerSyncTimeoutMs);
+                    if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
+                        throw new ConnectException("Failed to read to end of config topic");

Review Comment:
   Hmm on second thought it might be better to just log a warning instead. This will cause a 500 response to be returned even though both the empty set of task configs and the stopped target state have been produced to the config topic successfully (they're both synchronous writes).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13750:
URL: https://github.com/apache/kafka/pull/13750#discussion_r1214589790


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1104,7 +1104,9 @@ public void stopConnector(final String connName, final Callback<Void> callback)
                     writeTaskConfigs(connName, Collections.emptyList());
                     configBackingStore.putTargetState(connName, TargetState.STOPPED);
                     // Force a read of the new target state for the connector
-                    refreshConfigSnapshot(workerSyncTimeoutMs);
+                    if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
+                        throw new ConnectException("Failed to read to end of config topic");

Review Comment:
   Agreed, better to log a warning 👍
   
   I was also toying with the idea that we could shift the responsibility of refreshing the view of the config topic from operations that write to the topic, to operations that read from the topic. For example, if we need to check that a connector exists before pausing/resuming/restarting it, then that operation fails if we can't update our view of the topic.
   
   I don't think this is necessary to do in this PR and can be a follow-up. It's also questionable if we should even bother since there's still race conditions that can come from target state updates that originate from non-leader workers.
   
   But it's at least worth some food for thought.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13750:
URL: https://github.com/apache/kafka/pull/13750#discussion_r1218206159


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1104,7 +1104,9 @@ public void stopConnector(final String connName, final Callback<Void> callback)
                     writeTaskConfigs(connName, Collections.emptyList());
                     configBackingStore.putTargetState(connName, TargetState.STOPPED);
                     // Force a read of the new target state for the connector
-                    refreshConfigSnapshot(workerSyncTimeoutMs);
+                    if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
+                        throw new ConnectException("Failed to read to end of config topic");

Review Comment:
   Yep, exactly!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13750:
URL: https://github.com/apache/kafka/pull/13750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org