You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Chris Egerton (Jira)" <ji...@apache.org> on 2020/05/16 05:22:00 UTC

[jira] [Comment Edited] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

    [ https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108867#comment-17108867 ] 

Chris Egerton edited comment on KAFKA-9981 at 5/16/20, 5:21 AM:
----------------------------------------------------------------

[~ryannedolan] I think these configuration updates come from the connector requesting task reconfiguration from the framework: [https://github.com/apache/kafka/blob/62fa8fc9a95d738780d1f73d2d758d7329828feb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L232]

 

In distributed mode, this causes the framework to generate new task configs from the connector and then, if they've changed, try to write them to the config topic. However, only the leader is allowed to write directly to the config topic, so if the connector is hosted on a follower node, then the node has to forward those configs to the leader via the REST API: [https://github.com/apache/kafka/blob/62fa8fc9a95d738780d1f73d2d758d7329828feb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1316-L1340]

 

The endpoint for receiving these task configs was the subject of KIP-507, which sought to close a security loophole that it presented at the time. You can see the code for that internal endpoint here: [https://github.com/apache/kafka/blob/62fa8fc9a95d738780d1f73d2d758d7329828feb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L268-L278]

 

We might consider enabling a bare-bones REST API for MM2 that only supports this internal endpoint? As long as the {{SESSIONED}} protocol introduced in KIP-507 is used by the cluster, this wouldn't present any obvious security risks since requests would have to be signed with a session key that's distributed via the config topic and presumably only readable by workers in the cluster or trusted principals that have access to that topic.


was (Author: chrisegerton):
[~ryannedolan] I think these configuration updates come from the connector requesting task reconfiguration from the framework: [https://github.com/apache/kafka/blob/62fa8fc9a95d738780d1f73d2d758d7329828feb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L232]

 

In distributed mode, this causes the framework to generate new task configs from the connector and then, if they've changed, try to write them to the config topic. However, only the leader is allowed to write directly to the config topic, so if the connector is hosted on a follower node, then the node has to forward those configs to the leader via the REST API: [https://github.com/apache/kafka/blob/62fa8fc9a95d738780d1f73d2d758d7329828feb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1316-L1340]

 

The endpoint for receiving these task configs was the subject of KIP-507, which sought to close a security loophole that it presented at the time. You can see the code for that internal endpoint here: [https://github.com/apache/kafka/blob/62fa8fc9a95d738780d1f73d2d758d7329828feb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L268-L278]

> Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9981
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9981
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.4.0, 2.5.0, 2.4.1
>            Reporter: victor
>            Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
>     List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
>     if (isLeader()) {
>         configBackingStore.putTaskConfigs(connName, rawTaskProps);
>         cb.onCompletion(null, null);
>     } else {
>         // We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector
>         // addition or removal. If we blocked waiting for the response from leader, we may be kicked out of the worker group.
>         forwardRequestExecutor.submit(new Runnable() {
>             @Override
>             public void run() {
>                 try {
>                     String leaderUrl = leaderUrl();
>                     if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
>                         cb.onCompletion(new ConnectException("Request to leader to " +
>                                 "reconfigure connector tasks failed " +
>                                 "because the URL of the leader's REST interface is empty!"), null);
>                         return;
>                     }
>                     String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks");
>                     log.trace("Forwarding task configurations for connector {} to leader", connName);
>                     RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
>                     cb.onCompletion(null, null);
>                 } catch (ConnectException e) {
>                     log.error("Request to leader to reconfigure connector tasks failed", e);
>                     cb.onCompletion(e, null);
>                 }
>             }
>         });
>     }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic whitelist update.If KafkaConfigBackingStore task is not running on leader node,an HTTP request will be send to notify the leader of the configuration update.However,dedicated mm2 cluster does not have the HTTP server turned on,so the request will fail to be sent,causing the update operation to be lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)