You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ewen Cheslack-Postava (JIRA)" <ji...@apache.org> on 2016/07/25 23:35:20 UTC

[jira] [Commented] (KAFKA-3935) ConnectDistributedTest.test_restart_failed_task.connector_type=sink system test failing

    [ https://issues.apache.org/jira/browse/KAFKA-3935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15392850#comment-15392850 ] 

Ewen Cheslack-Postava commented on KAFKA-3935:
----------------------------------------------

Looked into this a bit more. It looks like the timeout is happening *just* too late for the driver to catch it. There are a few steps that occur to get to the point where the exception can happen. First, we write the config and rebalance to get the connector assigned. It starts up, generates task configs, and writes them. This causes another rebalance to get both connector and task assigned. Then the connector needs to start up and get to the point of hitting the exception. The test uses a delay of 5s, which is also the timeout used for offset commits. Since there's no data, the offset commits are actually the only thing that causes put() calls to be processed -- otherwise the connector blocks indefinitely waiting for data from Kafka. It looks like what's happening is that it takes 2 rounds of put() for the failure to occur because the 5s timeout for the first offset commit is actually started as soon as WorkerSinkTask is created, but the timeout for the task failure is started a bit later. So the first put() call is executed and we aren't quite ready to fail. This means we're waiting at least 10s for the failure once the task is running, and the delay of doing all the other stuff is putting us just over the limit (it looks like less than a second). It looks like we also might be undergoing one more rebalance which is also contributing to this, although from the current logs its a bit hard to tell why that's happening.

I think I want to do a couple of things to improve the test:
1.Increase the timeout a bit. Reading the test it looks like it shouldn't be necessary since we're just waiting for a 5s timeout, but actually we're waiting for quite a bit more than that to happen.
2. Force more frequent put() calls in the MockSinkTask using SinkContext.timeout(). This will help us decouple the behavior from the offset commit timing.
3. Add logging in the Mock classes to help debugging this. It's possible to do with the logging from the framework, but would have been easier to understand what was going on if I could filter to MockConnector/Task messages to start with.

> ConnectDistributedTest.test_restart_failed_task.connector_type=sink system test failing
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3935
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3935
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Ewen Cheslack-Postava
>
> This has failed a few times, see e.g. http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-07-07--001.1467911236--apache--trunk--efc4c88/report.html Note that it is *only* the sink task version, the source task one works ok.
> {code}
> ====================================================================================================
> test_id:    2016-07-06--001.kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_restart_failed_task.connector_type=sink
> status:     FAIL
> run time:   1 minute 10.991 seconds
>     Failed to see task transition to the FAILED state
> Traceback (most recent call last):
>   File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py", line 106, in run_all_tests
>     data = self.run_single_test()
>   File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py", line 162, in run_single_test
>     return self.current_test_context.function(self.current_test)
>   File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py", line 331, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", line 175, in test_restart_failed_task
>     err_msg="Failed to see task transition to the FAILED state")
>   File "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/utils/util.py", line 36, in wait_until
>     raise TimeoutError(err_msg)
> TimeoutError: Failed to see task transition to the FAILED state
> {code}
> I checked the worker logs and it does look like we're seeing the exception:
> {code}
> [2016-07-06 15:22:19,061] ERROR Task mock-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
> java.lang.RuntimeException
>         at org.apache.kafka.connect.tools.MockSinkTask.put(MockSinkTask.java:58)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:228)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:171)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
>         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
>         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> [2016-07-06 15:22:19,062] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)
> [2016-07-06 15:22:19,062] INFO WorkerSinkTask{id=mock-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
> [2016-07-06 15:22:19,065] DEBUG Group connect-mock-sink committed offset 0 for partition test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [2016-07-06 15:22:19,065] DEBUG Finished WorkerSinkTask{id=mock-sink-0} offset commit successfully in 3 ms (org.apache.kafka.connect.runtime.WorkerSinkTask)
> [2016-07-06 15:22:19,065] ERROR Task mock-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:406)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:228)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:171)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
>         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
>         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> [2016-07-06 15:22:19,065] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> So this is either a timing issue or the error handling in WorkerSinkTask is not properly setting the FAILED state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)