You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/03/01 15:45:32 UTC

[GitHub] [druid] wyndhblb opened a new issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

wyndhblb opened a new issue #10930:
URL: https://github.com/apache/druid/issues/10930


   
   ### Affected Version
   
   0.20.1
   
   
   ### Description
   
   We have an interesting issue that we've tracked to the transactional nature of production.  The real error that seems occurs (which halts the entire ingest forever, and there's no way to skip it which is unfortunate in it's own)
   
   the back trace is
   
   ```
   2021-02-23T22:50:44,663 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception while running task.
   org.apache.kafka.common.KafkaException: Received exception when fetching the next record from test_infra_infrabus_event-50. If needed, please seek past the record to continue consumption.
   	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577) ~[?:?]
   	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[?:?]
   	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[?:?]
   	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[?:?]
   	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[?:?]
   	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[?:?]
   	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[?:?]
   	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.poll(KafkaRecordSupplier.java:124) ~[?:?]
   	at org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.getRecords(IncrementalPublishingKafkaIndexTaskRunner.java:98) ~[?:?]
   	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:603) ~[druid-indexing-service-0.20.1.jar:0.20.1]
   	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:267) [druid-indexing-service-0.20.1.jar:0.20.1]
   	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:145) [druid-indexing-service-0.20.1.jar:0.20.1]
   	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:451) [druid-indexing-service-0.20.1.jar:0.20.1]
   	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:423) [druid-indexing-service-0.20.1.jar:0.20.1]
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
   Caused by: org.apache.kafka.common.InvalidRecordException: Incorrect declared batch size, records still remaining in file
   2021-02-23T22:50:44,672 INFO [task-runner-0-priority-0] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
     "id" : "xxxx_316d619204b9ebf_bljepncn",
     "status" : "FAILED",
     "duration" : 236717,
     "errorMsg" : "org.apache.kafka.common.KafkaException: Received exception when fetching the next record from test_i...",
     "location" : {
       "host" : null,
       "port" : -1,
       "tlsPort" : -1
     }
   }```
   
   The hunch is that this may be an aborted production transaction upstream.  Hard to say the exact cause (could also be some Compression issues causing some truncation of something).  In any case in an effort to see if it was a transaction sort of thing, ` isolation.level = read_uncommitted` was set however, it is not obeyed in the downstream peons.
   
   ... a log from the peon 
   
   ```
   the druid spec log shows
   
   "consumerProperties" : {
         "bootstrap.servers" : "test-infra-kf.di.infra-prod.asapp.com:9092",
         "group.id" : "test-infrabus-druid-minute-1",
         "auto.offset.reset" : "earliest",
         "max.partition.fetch.bytes" : 10485760,
         "fetch.max.bytes" : 52428800,
         "isolation.level" : "read_uncommitted"
       },
   
   .. the log of the kafka-properties of the peon shows
   
   
   allow.auto.create.topics = true
   	client.id = consumer-kafka-supervisor-oaiankjn-1
   	client.rack = 
   	connections.max.idle.ms = 540000
   	default.api.timeout.ms = 60000
   	enable.auto.commit = false
   	exclude.internal.topics = true
   	fetch.max.bytes = 52428800
   	fetch.max.wait.ms = 500
   	fetch.min.bytes = 1
   	group.id = kafka-supervisor-oaiankjn
   	group.instance.id = null
   	heartbeat.interval.ms = 3000
   	interceptor.classes = []
   	internal.leave.group.on.close = true
   	internal.throw.on.fetch.stable.offset.unsupported = false
   	isolation.level = read_committed
   ...
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] wyndhblb commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
wyndhblb commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-788551353


   great ... can you point me to some way to "skip" these sorts of errors (or some config/api to skip a block of offsets, one can typically do this with pure kafka consumer groups, but in druid i'm unsure how .. i've tried manually moving the offsets in the backing DB, but that just created a hash issue and crashed everything)? At the moment the peon that has the effect partition basically is in a crash loop and never gets past things w/o a hard rest moving the offsets to the end of the topic.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-788497357


   Currently, `isolation.level` is not a property that can be set by user. The upcoming 0.21 will remove this constraint. See #10551


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-793143846


   Ah yeah trying to adjust offsets while ingestion is running would be difficult. I do it by suspending the supervisor, adjusting the offsets, and restarting the supervisor (e.g. delaying across all partitions, but avoids data loss). Druid could automate some of this (even to the level of having the seekable stream seek over corrupt partitions automatically), but that violates some of the guarantees Druid makes around delivery.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-788622311


   I have never seen such an error thrown by Kafka. From the stack trace message, it seems that the a record stored at Kafka broker side is broken. 
   
   Since Druid manages the offsets where it consumes from Kafka, I think maybe one way is to reset supervisor to the latest offset.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] wyndhblb commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
wyndhblb commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-792775848


   Yes i don't think this is a druid issue, as the error originates in the kafka client itself.  However if the issue is in the producer or brokers, is harder to say.  I was looking for a way to "skip" the offending offsets.  The issue being that if there are multiple peons (say 8) consuming the stream it only effects one of of the peons, and one of the partitions in that peon group, meaning the partitions it has been assigned get very far behind very fast, while the others move along.  Resetting the offsets to "latest" is the only option current, but that will mean 1/8th of the partitions are basically very far behind while the others are not, and of course a slew of data i missed depending on the message rate.
   
   I was looking for a way to manually "skip" offsets, i tried the suggestion of `altering the backing offsets in the metadata store` from @JulianJaffePinterest, however there's a lock and a hash associated w/ the values stored, and so resetting them by hand caused even more trouble (all peons crashed and i was forced to move reset everything).
   
   So aside from wanting `isolation.level = read_uncommitted` perhaps another thing that's needed is a easy way to properly move offsets.  I can see this being a useful thing in general, especially if producers make "other mistakes" (like KF message version that are incorrect, bad encodings, using compression that's unsupported, etc)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] wyndhblb commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
wyndhblb commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-788922620


   Yea it's a "new one to me" as well.  We have a myriad of other consumers on that topic doing just fine, with no errors to speak of, but all of them are "read_uncommited" style consumers (which is what makes me think it's something about an aborted transaction)
   
   Yes the reset "works" for a spell, but then it will appear again on another offset.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-788984496


   If your assumption is right, maybe there's some bug at Kafka broker side. I think you could seek some help from Kafka community to see what this exception is.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-792398881


   I've caused similar bugs in the past while testing failure handling with Flink and similar systems. The default Kafka consumer, which Druid and most other projects use under the hood, intentionally blocks forever when it encounters invalid records. I've worked around this in the past by just blindly advancing offsets until we re-encountered parseable offsets, but that was in a context without strict guarantees. Druid could implement a similar solution (probably in `SeekableStreamIndexTaskRunner`, judging by the stack trace, but I haven't looked at the code). The tricky part will be supporting multiple different delivery semantics. Hopefully your intuition is right and standardizing the isolation level works for you, but until then manually advancing the offsets (either by resetting the supervisor to the latest offsets and thus dropping data or by altering the backing offsets in the metadata store) is probably your only option.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-792642992


   When I've seen similar errors, it was while testing killing or partitioning consumers mid-write. Those errors were mostly invalid offsets and the like, not this incorrect declared batch size error. There's almost certainly _something_ that's a bug here, but I'm not sure if it's in Wyndham's producer code (and the bug just wasn't noticed before because the consumers were reading uncommitted), Kafka itself, or Druid. I'd guess 1 and 3 are more likely than 2 but you never know.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] wyndhblb commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
wyndhblb commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-794241003


   @JulianJaffePinterest agree there's a "data loss" condition, but on the flip side, allowing some settings that would enable skipping of things would lessen the data loss (at the users own digression of course, defaults should always be as strict as possible),  in so far as a million messages/per minute system, just resetting all offsets to "latest" is a very nasty chunk of data loss, vs skipping the few "corrupted in some weird way" messages.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on issue #10930: Kafka Injest: Not obeying: ioConfig.consumerProperties -> isolation.level = read_uncommitted

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on issue #10930:
URL: https://github.com/apache/druid/issues/10930#issuecomment-792410803


   Hi @JulianJaffePinterest , do you know why and when this exception comes out ? I don't know whether it's a bug of Kafka broker. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org