You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by srlin <sh...@163.com> on 2019/06/27 17:47:58 UTC

Questions about KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Hi, team,

We are using Kafka Connect at present and have encountered a problem with version 2.1.0 that all connectors kept restarting when one new connector was added into the cluster which then failed to start due to some network problem (firewall not open). 
And the Connect daemon failed to serve requests since the herder thread was blocked, and we could not delete the failed connector until we restarted the Connect process.

We searched for solutions in new versions and found that KIP-415 may be what we want since it seems to be able to avoid the stop-the-world behavior when any connector change happens.

After test on branch:2.3.0 which includes KIP-415, we found an unexpected behavior.

Here are the steps for reproducing the case:

1. setup a Kafka Connect cluster by starting one worker with distributed config connecting to an existing Kafka
2. create two topics (say T1, T2) to be the destination for two source connectors
3. create a FileStreamSourceConnector (say C1) to write file content to T1 and from status API we can see the tasks of C1 are running
4. create another FileStreamSourceConnector (say C2) to write file content to T2

After finishing these steps, what we expected is that all the tasks of C1 will not be restarted since they are running independently and they don’t care about the new added C2 and its tasks.
But the actual behavior is that all the tasks of C1 will be restarted according to the INFO and DEBUG logs.


The log is too long and is shown partly below (C1 named file_sync_1 and C2 named file_sync_2):
——————————————————————————————————————————————————————————————————————————————————— —————— 
[2019-06-27 22:55:04,621] INFO [Worker clientId=connect-1, groupId=connect-cluster] Connector file_sync_2 config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1268)

…

[2019-06-27 22:55:05,134] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Augmented new assignment: Assignment{error=0, leader='connect-1-903c66b2-ba68-4e4f-b27f-7cf58ce41f77', leaderUrl='http://127.0.0.1:8083/', offset=10, connectorIds=[file_sync_2, file_sync_1], taskIds=[file_sync_1-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:198)

…

[2019-06-27 22:55:05,134] INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connector file_sync_2 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1075)

…

[2019-06-27 22:55:05,630] INFO [Worker clientId=connect-1, groupId=connect-cluster] Tasks [file_sync_2-0, file_sync_1-0] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1283)

...

[2019-06-27 22:55:06,133] INFO [Worker clientId=connect-1, groupId=connect-cluster] Handling task config update by restarting tasks [file_sync_1-0] (org.apache.kafka.connect.runtime.distributed.DistributedHerder:498)
[2019-06-27 22:55:06,133] INFO Stopping task file_sync_1-0 (org.apache.kafka.connect.runtime.Worker:684)

…
——————————————————————————————————————————————————————————————————————————————————— —————— 



From the log we can see the creation of C2 led to the restart of tasks of C1 (file_sync_1-0), which means the creation of one connector will have impact on all tasks in this one-worker cluster.

We went through the code and found the restart behavior is triggered by the following logic:

——————————————————————————————— ———————————————————————————
// connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java Line: 488 ~ 501

private void processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId> taskConfigUpdates) {
    Set<ConnectorTaskId> localTasks = assignment == null
                                      ? Collections.emptySet()
                                      : new HashSet<>(assignment.tasks());
    Set<String> connectorsWhoseTasksToStop = taskConfigUpdates.stream()
            .map(ConnectorTaskId::connector).collect(Collectors.toSet());

    List<ConnectorTaskId> tasksToStop = localTasks.stream()
            .filter(taskId -> connectorsWhoseTasksToStop.contains(taskId.connector()))
            .collect(Collectors.toList());
    log.info("Handling task config update by restarting tasks {}", tasksToStop);
    worker.stopAndAwaitTasks(tasksToStop);
    tasksToRestart.addAll(tasksToStop);
}

// The taskConfigUpdates originally comes from connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java Line: 620 ~ 637

    } else {
        if (deferred != null) {
            taskConfigs.putAll(deferred);
            updatedTasks.addAll(taskConfigs.keySet());
        }
        inconsistent.remove(connectorName);
    }
    // Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
    // update, then we need to see a completely fresh set of configs after this commit message, so we don't
    // want any of these outdated configs
    if (deferred != null)
        deferred.clear();

    connectorTaskCounts.put(connectorName, newTaskCount);
}

if (started)
    updateListener.onTaskConfigUpdate(updatedTasks);
——————————————————————————————————————————————————————————— 

As we can see the updatedTasks contain the running tasks of previous assignment.

So the question here is:

Is this an expected behavior by designed? If yes, is it also the same behavior when other connector level changes happen such as config update of a connector or a connector restart?

Please correct me if I got anything wrong. Any feedback would be greatly appreciated. Thank you!

Regards,
Shurong


 

Re: Questions about KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Shurong,

What you bumped into seems to be the same issue as is reported tracked
by Luying
Liu here: https://issues.apache.org/jira/browse/KAFKA-8676.


Guozhang

On Thu, Jun 27, 2019 at 10:48 AM srlin <sh...@163.com> wrote:

> Hi, team,
>
> We are using Kafka Connect at present and have encountered a problem with
> version 2.1.0 that all connectors kept restarting when one new connector
> was added into the cluster which then failed to start due to some network
> problem (firewall not open).
> And the Connect daemon failed to serve requests since the herder thread
> was blocked, and we could not delete the failed connector until we
> restarted the Connect process.
>
> We searched for solutions in new versions and found that KIP-415 may be
> what we want since it seems to be able to avoid the stop-the-world behavior
> when any connector change happens.
>
> After test on branch:2.3.0 which includes KIP-415, we found an unexpected
> behavior.
>
> Here are the steps for reproducing the case:
>
> 1. setup a Kafka Connect cluster by starting one worker with distributed
> config connecting to an existing Kafka
> 2. create two topics (say T1, T2) to be the destination for two source
> connectors
> 3. create a FileStreamSourceConnector (say C1) to write file content to T1
> and from status API we can see the tasks of C1 are running
> 4. create another FileStreamSourceConnector (say C2) to write file content
> to T2
>
> After finishing these steps, what we expected is that all the tasks of C1
> will not be restarted since they are running independently and they don’t
> care about the new added C2 and its tasks.
> But the actual behavior is that all the tasks of C1 will be restarted
> according to the INFO and DEBUG logs.
>
>
> The log is too long and is shown partly below (C1 named file_sync_1 and C2
> named file_sync_2):
> ———————————————————————————————————————————————————————————————————————————————————
> ——————
> [2019-06-27 22:55:04,621] INFO [Worker clientId=connect-1,
> groupId=connect-cluster] Connector file_sync_2 config updated
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1268)
>
> …
>
> [2019-06-27 22:55:05,134] DEBUG [Worker clientId=connect-1,
> groupId=connect-cluster] Augmented new assignment: Assignment{error=0,
> leader='connect-1-903c66b2-ba68-4e4f-b27f-7cf58ce41f77', leaderUrl='
> http://127.0.0.1:8083/', offset=10, connectorIds=[file_sync_2,
> file_sync_1], taskIds=[file_sync_1-0], revokedConnectorIds=[],
> revokedTaskIds=[], delay=0}
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:198)
>
> …
>
> [2019-06-27 22:55:05,134] INFO [Worker clientId=connect-1,
> groupId=connect-cluster] Starting connector file_sync_2
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1075)
>
> …
>
> [2019-06-27 22:55:05,630] INFO [Worker clientId=connect-1,
> groupId=connect-cluster] Tasks [file_sync_2-0, file_sync_1-0] configs
> updated
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1283)
>
> ...
>
> [2019-06-27 22:55:06,133] INFO [Worker clientId=connect-1,
> groupId=connect-cluster] Handling task config update by restarting tasks
> [file_sync_1-0]
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:498)
> [2019-06-27 22:55:06,133] INFO Stopping task file_sync_1-0
> (org.apache.kafka.connect.runtime.Worker:684)
>
> …
> ———————————————————————————————————————————————————————————————————————————————————
> ——————
>
>
>
> From the log we can see the creation of C2 led to the restart of tasks of
> C1 (file_sync_1-0), which means the creation of one connector will have
> impact on all tasks in this one-worker cluster.
>
> We went through the code and found the restart behavior is triggered by
> the following logic:
>
> ——————————————————————————————— ———————————————————————————
> //
> connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
> Line: 488 ~ 501
>
> private void
> processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId>
> taskConfigUpdates) {
>     Set<ConnectorTaskId> localTasks = assignment == null
>                                       ? Collections.emptySet()
>                                       : new HashSet<>(assignment.tasks());
>     Set<String> connectorsWhoseTasksToStop = taskConfigUpdates.stream()
>             .map(ConnectorTaskId::connector).collect(Collectors.toSet());
>
>     List<ConnectorTaskId> tasksToStop = localTasks.stream()
>             .filter(taskId ->
> connectorsWhoseTasksToStop.contains(taskId.connector()))
>             .collect(Collectors.toList());
>     log.info("Handling task config update by restarting tasks {}",
> tasksToStop);
>     worker.stopAndAwaitTasks(tasksToStop);
>     tasksToRestart.addAll(tasksToStop);
> }
>
> // The taskConfigUpdates originally comes from
> connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
> Line: 620 ~ 637
>
>     } else {
>         if (deferred != null) {
>             taskConfigs.putAll(deferred);
>             updatedTasks.addAll(taskConfigs.keySet());
>         }
>         inconsistent.remove(connectorName);
>     }
>     // Always clear the deferred entries, even if we didn't apply them. If
> they represented an inconsistent
>     // update, then we need to see a completely fresh set of configs after
> this commit message, so we don't
>     // want any of these outdated configs
>     if (deferred != null)
>         deferred.clear();
>
>     connectorTaskCounts.put(connectorName, newTaskCount);
> }
>
> if (started)
>     updateListener.onTaskConfigUpdate(updatedTasks);
> ———————————————————————————————————————————————————————————
>
> As we can see the updatedTasks contain the running tasks of previous
> assignment.
>
> So the question here is:
>
> Is this an expected behavior by designed? If yes, is it also the same
> behavior when other connector level changes happen such as config update of
> a connector or a connector restart?
>
> Please correct me if I got anything wrong. Any feedback would be greatly
> appreciated. Thank you!
>
> Regards,
> Shurong
>
>
>
>

-- 
-- Guozhang