You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/03/22 07:03:00 UTC

[jira] [Commented] (KAFKA-9743) StreamTask could fail to close during HandleNewAssignment

    [ https://issues.apache.org/jira/browse/KAFKA-9743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064185#comment-17064185 ] 

ASF GitHub Bot commented on KAFKA-9743:
---------------------------------------

abbccdda commented on pull request #8327: KAFKA-9743: Catch commit offset exception to eventually close dirty tasks
URL: https://github.com/apache/kafka/pull/8327
 
 
   This PR tries to close all the dirty tasks during `HandleAssignment` in case the commit call failed. The previous outcome was that all the lost tasks are not properly closed which leads to the RocksDB metric stats not cleared, and eventually blows the application away, since we have an illegal state check for re-adding an existing metrics:
   ```
    public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) {
           final String metricsRecorderName = metricsRecorderName(metricsRecorder);
           if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
               throw new IllegalStateException("RocksDB metrics recorder for store \"" + metricsRecorder.storeName() +
                   "\" of task " + metricsRecorder.taskId().toString() + " has already been added. "
                   + "This is a bug in Kafka Streams.");
           }
           metricsRecordersToTrigger.put(metricsRecorderName, metricsRecorder);
       }
   ```
   Unit test will be added shortly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> StreamTask could fail to close during HandleNewAssignment
> ---------------------------------------------------------
>
>                 Key: KAFKA-9743
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9743
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> We found this particular bug from happening in soak:
> [2020-03-20T16:12:02-07:00] (streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) [2020-03-20 23:12:01,534] ERROR [stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] stream-thread [stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] Encountered the following exception during processing and the thread is going to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-03-20T16:12:02-07:00] (streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) java.lang.IllegalStateException: RocksDB metrics recorder for store "KSTREAM-AGGREGATE-STATE-STORE-0000000040" of task 2_2 has already been added. This is a bug in Kafka Streams.
>         at org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger.addMetricsRecorder(RocksDBMetricsRecordingTrigger.java:30)
>         at org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder.addStatistics(RocksDBMetricsRecorder.java:98)
>         at org.apache.kafka.streams.state.internals.RocksDBStore.maybeSetUpMetricsRecorder(RocksDBStore.java:207)
>         at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:193)
>         at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:231)
>         at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44)
>         at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58)
>         at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)
>         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)
>         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101)
>         at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:81)
>         at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:191)
>         at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:329)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:587)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:501)
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:475)
>  
> Which could bring the entire instance down. The bug was that if we fail to do the commit during task close section, the actual `closeClean` call could not be triggered.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)