You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/03/12 05:37:23 UTC

[GitHub] [incubator-druid] jihoonson opened a new issue #7239: Kafka tasks fail after resuming for incremental handoff

jihoonson opened a new issue #7239: Kafka tasks fail after resuming for incremental handoff
URL: https://github.com/apache/incubator-druid/issues/7239
 
 
   ### Affected Version
   
   0.14.0 (This bug was introduced in #6431).
   
   ### Description
   
   Here is the stack trace.
   
   ```
   2019-03-12T02:59:51,878 INFO [appenderator_persist_0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Persist completed with metadata [AppenderatorDriverMetadata{segments={index_kafka_clarity-cloud0_d756771c2863b20_0=[SegmentWithState{segmentIdentifier=clarity-cloud0_2019-03-12T02:00:00.000Z_2019-03-12T03:00:00.000Z_2019-03-12T01:56:11.119Z_10, state=APPENDING}]}, lastSegmentIds={index_kafka_clarity-cloud0_d756771c2863b20_0=clarity-cloud0_2019-03-12T02:00:00.000Z_2019-03-12T03:00:00.000Z_2019-03-12T01:56:11.119Z_10}, callerMetadata={nextPartitions=SeekableStreamPartitions{stream/topic='clarity-cloud0', partitionSequenceNumberMap/partitionOffsetMap={1=103431503737, 4=6100519619, 7=6100600785, 10=6098083460, 13=6101049368, 16=6101000251, 19=6097385400, 22=6100601015, 25=6101101786, 28=6101055354}}}}]
   2019-03-12T02:59:52,005 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Pausing ingestion until resumed
   2019-03-12T02:59:52,070 INFO [qtp606061176-123] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Persisting Sequences Metadata [[SequenceMetadata{sequenceName='index_kafka_clarity-cloud0_d756771c2863b20_0', sequenceId=0, startOffsets={16=6100335393, 1=103430839175, 19=6096726535, 4=6099861583, 22=6099935437, 7=6099935104, 25=6100439713, 10=6097421649, 28=6100393134, 13=6100392241}, endOffsets={16=6101282427, 1=103431786274, 19=6097668960, 4=6100807623, 22=6100884284, 7=6100884039, 25=6101382663, 10=6098354147, 28=6101340178, 13=6101339869}, assignments=[16, 1, 19, 4, 22, 7, 25, 10, 28, 13], sentinel=false, checkpointed=true}, SequenceMetadata{sequenceName='index_kafka_clarity-cloud0_d756771c2863b20_1', sequenceId=1, startOffsets={16=6101282427, 1=103431786274, 19=6097668960, 4=6100807623, 22=6100884284, 7=6100884039, 25=6101382663, 10=6098354147, 28=6101340178, 13=6101339869}, endOffsets={16=9223372036854775807, 1=9223372036854775807, 19=9223372036854775807, 4=9223372036854775807, 22=9223372036854775807, 7=9223372036854775807, 25=9223372036854775807, 10=9223372036854775807, 28=9223372036854775807, 13=9223372036854775807}, assignments=[16, 1, 19, 4, 22, 7, 25, 10, 28, 13], sentinel=false, checkpointed=false}]]
   2019-03-12T02:59:52,086 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Ingestion loop resumed
   2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[25] to sequence[6101110684].
   2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[10] to sequence[6098092421].
   2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[4] to sequence[6100527087].
   2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[7] to sequence[6100609294].
   2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[22] to sequence[6100609476].
   2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[1] to sequence[103431512339].
   2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[16] to sequence[6101007019].
   2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[19] to sequence[6097392961].
   2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[13] to sequence[6101057094].
   2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[28] to sequence[6101057589].
   2019-03-12T02:59:52,091 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception in run() before persisting.
   org.apache.druid.java.util.common.ISE: Starting sequenceNumber [6101007019] does not match expected [6101282427] for partition [16]
   	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.verifyInitialRecordAndSkipExclusivePartition(SeekableStreamIndexTaskRunner.java:1895) ~[druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
   	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:517) [druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
   	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:246) [druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
   	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:166) [druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
   	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419) [druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
   	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391) [druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_163]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_163]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_163]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_163]
   ```
   
   This happens only when `setEndOffsets()` is called with `finish = false` which is missing in our unit tests. If `finish` is false, `initialOffsetsSnapshot` is updated to the start offsets of the new sequence in `setEndOffsets()`. However, if there are remaining offsets for a task to consume before starting a new sequence, `verifyInitialRecordAndSkipExclusivePartition()` can throw an error because the offset of the read record can be smaller than that in `intialSequenceSnapshot`:
   
   ```
     private boolean verifyInitialRecordAndSkipExclusivePartition(
         final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record,
         final Map<PartitionIdType, SequenceOffsetType> intialSequenceSnapshot
     )
     {
       if (intialSequenceSnapshot.containsKey(record.getPartitionId())) {
         if (record.getSequenceNumber().compareTo(intialSequenceSnapshot.get(record.getPartitionId())) < 0) {
           throw new ISE(
               "Starting sequenceNumber [%s] does not match expected [%s] for partition [%s]",
               record.getSequenceNumber(),
               intialSequenceSnapshot.get(record.getPartitionId()),
               record.getPartitionId()
           );
         }
   ```
   
   This error is reproducible in `KafkaIndexTaskTest.testIncrementalHandOffReadsThroughEndOffsets()` if you set `finish` to false [here](https://github.com/apache/incubator-druid/blob/master/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java#L935).
   
   I'll raise a PR to fix this bug soon.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org