You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2019/07/23 01:22:50 UTC

Re: Questions about KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Hello Shurong,

What you bumped into seems to be the same issue as is reported tracked
by Luying
Liu here: https://issues.apache.org/jira/browse/KAFKA-8676.


Guozhang

On Thu, Jun 27, 2019 at 10:48 AM srlin <sh...@163.com> wrote:

> Hi, team,
>
> We are using Kafka Connect at present and have encountered a problem with
> version 2.1.0 that all connectors kept restarting when one new connector
> was added into the cluster which then failed to start due to some network
> problem (firewall not open).
> And the Connect daemon failed to serve requests since the herder thread
> was blocked, and we could not delete the failed connector until we
> restarted the Connect process.
>
> We searched for solutions in new versions and found that KIP-415 may be
> what we want since it seems to be able to avoid the stop-the-world behavior
> when any connector change happens.
>
> After test on branch:2.3.0 which includes KIP-415, we found an unexpected
> behavior.
>
> Here are the steps for reproducing the case:
>
> 1. setup a Kafka Connect cluster by starting one worker with distributed
> config connecting to an existing Kafka
> 2. create two topics (say T1, T2) to be the destination for two source
> connectors
> 3. create a FileStreamSourceConnector (say C1) to write file content to T1
> and from status API we can see the tasks of C1 are running
> 4. create another FileStreamSourceConnector (say C2) to write file content
> to T2
>
> After finishing these steps, what we expected is that all the tasks of C1
> will not be restarted since they are running independently and they don’t
> care about the new added C2 and its tasks.
> But the actual behavior is that all the tasks of C1 will be restarted
> according to the INFO and DEBUG logs.
>
>
> The log is too long and is shown partly below (C1 named file_sync_1 and C2
> named file_sync_2):
> ———————————————————————————————————————————————————————————————————————————————————
> ——————
> [2019-06-27 22:55:04,621] INFO [Worker clientId=connect-1,
> groupId=connect-cluster] Connector file_sync_2 config updated
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1268)
>
> …
>
> [2019-06-27 22:55:05,134] DEBUG [Worker clientId=connect-1,
> groupId=connect-cluster] Augmented new assignment: Assignment{error=0,
> leader='connect-1-903c66b2-ba68-4e4f-b27f-7cf58ce41f77', leaderUrl='
> http://127.0.0.1:8083/', offset=10, connectorIds=[file_sync_2,
> file_sync_1], taskIds=[file_sync_1-0], revokedConnectorIds=[],
> revokedTaskIds=[], delay=0}
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:198)
>
> …
>
> [2019-06-27 22:55:05,134] INFO [Worker clientId=connect-1,
> groupId=connect-cluster] Starting connector file_sync_2
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1075)
>
> …
>
> [2019-06-27 22:55:05,630] INFO [Worker clientId=connect-1,
> groupId=connect-cluster] Tasks [file_sync_2-0, file_sync_1-0] configs
> updated
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1283)
>
> ...
>
> [2019-06-27 22:55:06,133] INFO [Worker clientId=connect-1,
> groupId=connect-cluster] Handling task config update by restarting tasks
> [file_sync_1-0]
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:498)
> [2019-06-27 22:55:06,133] INFO Stopping task file_sync_1-0
> (org.apache.kafka.connect.runtime.Worker:684)
>
> …
> ———————————————————————————————————————————————————————————————————————————————————
> ——————
>
>
>
> From the log we can see the creation of C2 led to the restart of tasks of
> C1 (file_sync_1-0), which means the creation of one connector will have
> impact on all tasks in this one-worker cluster.
>
> We went through the code and found the restart behavior is triggered by
> the following logic:
>
> ——————————————————————————————— ———————————————————————————
> //
> connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
> Line: 488 ~ 501
>
> private void
> processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId>
> taskConfigUpdates) {
>     Set<ConnectorTaskId> localTasks = assignment == null
>                                       ? Collections.emptySet()
>                                       : new HashSet<>(assignment.tasks());
>     Set<String> connectorsWhoseTasksToStop = taskConfigUpdates.stream()
>             .map(ConnectorTaskId::connector).collect(Collectors.toSet());
>
>     List<ConnectorTaskId> tasksToStop = localTasks.stream()
>             .filter(taskId ->
> connectorsWhoseTasksToStop.contains(taskId.connector()))
>             .collect(Collectors.toList());
>     log.info("Handling task config update by restarting tasks {}",
> tasksToStop);
>     worker.stopAndAwaitTasks(tasksToStop);
>     tasksToRestart.addAll(tasksToStop);
> }
>
> // The taskConfigUpdates originally comes from
> connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
> Line: 620 ~ 637
>
>     } else {
>         if (deferred != null) {
>             taskConfigs.putAll(deferred);
>             updatedTasks.addAll(taskConfigs.keySet());
>         }
>         inconsistent.remove(connectorName);
>     }
>     // Always clear the deferred entries, even if we didn't apply them. If
> they represented an inconsistent
>     // update, then we need to see a completely fresh set of configs after
> this commit message, so we don't
>     // want any of these outdated configs
>     if (deferred != null)
>         deferred.clear();
>
>     connectorTaskCounts.put(connectorName, newTaskCount);
> }
>
> if (started)
>     updateListener.onTaskConfigUpdate(updatedTasks);
> ———————————————————————————————————————————————————————————
>
> As we can see the updatedTasks contain the running tasks of previous
> assignment.
>
> So the question here is:
>
> Is this an expected behavior by designed? If yes, is it also the same
> behavior when other connector level changes happen such as config update of
> a connector or a connector restart?
>
> Please correct me if I got anything wrong. Any feedback would be greatly
> appreciated. Thank you!
>
> Regards,
> Shurong
>
>
>
>

-- 
-- Guozhang