You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/29 11:26:54 UTC

[GitHub] [kafka] yashmayya opened a new pull request, #12568: KAFKA-14015: StandaloneHerder::restartConnector should reconfigure tasks if configs have been changed

yashmayya opened a new pull request, #12568:
URL: https://github.com/apache/kafka/pull/12568

   - https://issues.apache.org/jira/browse/KAFKA-14015: A config provider (https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations) that has dynamic reloading of secrets doesn't work as expected when Connect is run in standalone mode - i.e. tasks continue running with an older secret config value when the secret is rotated.
   - When a config provider needs to inform the herder that a connector needs to be restarted due to a change in a secret config that needs to be propagated to the connector and its tasks - it calls `Herder::restartConnector`
   - In the `DistributedHerder` implementation of `restartConnector` this works as expected where the connector is stopped, then started, and then if it's in the `STARTED` state at the end of startup - a task reconfiguration is requested. A task reconfiguration involves asking the connector to return a list of task configs (using the latest connector configs), comparing it with the running task configs and if a change is detected - write the new set of task configs to the config topic which will eventually result in the old set of tasks being stopped and a new set of tasks being brought up. 
   - In the `StandaloneHerder` implementation, however, the step of requesting task reconfiguration is missing. This PR aims to fix this which should solve the issue in https://issues.apache.org/jira/browse/KAFKA-14015


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12568: KAFKA-14015: StandaloneHerder::restartConnector should reconfigure tasks if configs have been changed

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12568:
URL: https://github.com/apache/kafka/pull/12568#discussion_r961796742


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -294,7 +294,12 @@ public synchronized void restartConnector(String connName, Callback<Void> cb) {
 
         worker.stopAndAwaitConnector(connName);
 
-        startConnector(connName, (error, result) -> cb.onCompletion(error, null));
+        startConnector(connName, (error, targetState) -> {
+            if (targetState == TargetState.STARTED) {
+                requestTaskReconfiguration(connName);

Review Comment:
   I think this is fine as-is, we synchronize in `requestTaskReconfiguration` either way. I can't think of a significant difference between invoking that method in the `WorkerConnector`'s thread and invoking it via the herder's `requestExecutorService`.
   
   In fact, I don't think we need to invoke anything via the `requestExecutorService` except in the variant of `restartConnector` that accepts a delay. But that's out of scope for this PR and should be addressed separately in order to simplify review.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -812,7 +861,7 @@ public void testPutConnectorConfig() throws Exception {
         FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = new FutureCallback<>();
         herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, reconfigureCallback);
         Herder.Created<ConnectorInfo> newConnectorInfo = reconfigureCallback.get(1000L, TimeUnit.SECONDS);
-        ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
+        ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)),

Review Comment:
   My IDE has been nagging me about this one too, for years. I've left it as-is (maybe not here but in other places) since it's not performance-critical and it's easier to modify in case we need to add other elements, or construct an empty list. I think we should revert this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12568: KAFKA-14015: StandaloneHerder::restartConnector should reconfigure tasks if configs have been changed

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12568:
URL: https://github.com/apache/kafka/pull/12568#discussion_r957548339


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -294,7 +294,12 @@ public synchronized void restartConnector(String connName, Callback<Void> cb) {
 
         worker.stopAndAwaitConnector(connName);
 
-        startConnector(connName, (error, result) -> cb.onCompletion(error, null));
+        startConnector(connName, (error, targetState) -> {
+            if (targetState == TargetState.STARTED) {
+                requestTaskReconfiguration(connName);

Review Comment:
   I'm wondering if this should be submitted as a runnable to `requestExecutorService` instead - since `restartConnector` is also called by the `POST /{connector}/restart` API, and this change could lead to a significantly higher response latency. Also, if we were to do that, should we add a `sleep` call to the relevant unit tests and retain the same expectations / verifications (with the assumption that the requestExecutorService will complete the task in the background)?
   
   Edit: I guess a better option on the testing front might be to pass an implementation of `ScheduledExecutorService` to `StandaloneHerder` which runs tasks in the same thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12568: KAFKA-14015: StandaloneHerder::restartConnector should reconfigure tasks if configs have been changed

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12568:
URL: https://github.com/apache/kafka/pull/12568


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12568: KAFKA-14015: StandaloneHerder::restartConnector should reconfigure tasks if configs have been changed

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12568:
URL: https://github.com/apache/kafka/pull/12568#discussion_r957548339


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -294,7 +294,12 @@ public synchronized void restartConnector(String connName, Callback<Void> cb) {
 
         worker.stopAndAwaitConnector(connName);
 
-        startConnector(connName, (error, result) -> cb.onCompletion(error, null));
+        startConnector(connName, (error, targetState) -> {
+            if (targetState == TargetState.STARTED) {
+                requestTaskReconfiguration(connName);

Review Comment:
   I'm wondering if this should be submitted as a runnable to `requestExecutorService` instead - since `restartConnector` is also called by the `POST /{connector}/restart` API, and this change could lead to a significantly higher response latency. Also, if we were to do that, should we add a `sleep` call to the relevant unit tests and retain the same expectations / verifications (with the assumption that the requestExecutorService will complete the task in the background) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on pull request #12568: KAFKA-14015: StandaloneHerder::restartConnector should reconfigure tasks if configs have been changed

Posted by GitBox <gi...@apache.org>.
yashmayya commented on PR #12568:
URL: https://github.com/apache/kafka/pull/12568#issuecomment-1230158528

   @C0urante @vamossagar12 could you please take a look at this whenever you get a chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12568: KAFKA-14015: StandaloneHerder::restartConnector should reconfigure tasks if configs have been changed

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12568:
URL: https://github.com/apache/kafka/pull/12568#discussion_r962112024


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -812,7 +861,7 @@ public void testPutConnectorConfig() throws Exception {
         FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = new FutureCallback<>();
         herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, reconfigureCallback);
         Herder.Created<ConnectorInfo> newConnectorInfo = reconfigureCallback.get(1000L, TimeUnit.SECONDS);
-        ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
+        ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0)),

Review Comment:
   Haha yeah I only did this so that my IDE stopped nagging me 😄 
   
   >it's easier to modify in case we need to add other elements, or construct an empty list
   
   Fair point, I've reverted this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org