You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "gianm (via GitHub)" <gi...@apache.org> on 2023/03/07 13:58:25 UTC

[GitHub] [druid] gianm opened a new issue, #13894: Uncaught ParseException when reading Avro from Kafka

gianm opened a new issue, #13894:
URL: https://github.com/apache/druid/issues/13894

   There's a problematic code path in `StreamChunkParser.parseWithInputFormat`:
   
   ```
   try (FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
             byteEntityReader.read(),
             rowFilter,
             rowIngestionMeters,
             parseExceptionHandler
         )) {
           rowIterator.forEachRemaining(rows::add);
         }
   ```
   
   It's a problem because `byteEntityReader.read()` can throw ParseException, but nothing in the call stack catches it.
   
   Logs look like this when using Kafka + Avro + an invalid message:
   
   ```
   2023-03-01T03:52:23,217 INFO [task-runner-0-priority-0] org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-kafka-supervisor-gjmihhhb-1, groupId=kafka-supervisor-gjmihhhb] Cluster ID: KeurxA5sT_eKFHuIHquSHg
   2023-03-01T03:52:23,259 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception in run() before persisting.
   org.apache.druid.java.util.common.parsers.ParseException: Avro's unnecessary EOFException, detail: [https://issues.apache.org/jira/browse/AVRO-813]
   	at org.apache.druid.data.input.avro.InlineSchemaAvroBytesDecoder.parse(InlineSchemaAvroBytesDecoder.java:92) ~[?:?]
   	at org.apache.druid.data.input.avro.AvroStreamReader.intermediateRowIterator(AvroStreamReader.java:70) ~[?:?]
   	at org.apache.druid.data.input.IntermediateRowParsingReader.intermediateRowIteratorWithMetadata(IntermediateRowParsingReader.java:231) ~[druid-core-25.0.0.jar:25.0.0]
   	at org.apache.druid.data.input.IntermediateRowParsingReader.read(IntermediateRowParsingReader.java:49) ~[druid-core-25.0.0.jar:25.0.0]
   	at org.apache.druid.segment.transform.TransformingInputEntityReader.read(TransformingInputEntityReader.java:43) ~[druid-processing-25.0.0.jar:25.0.0]
   	at org.apache.druid.indexing.seekablestream.SettableByteEntityReader.read(SettableByteEntityReader.java:70) ~[druid-indexing-service-25.0.0.jar:25.0.0]
   	at org.apache.druid.indexing.seekablestream.StreamChunkParser.parseWithInputFormat(StreamChunkParser.java:135) ~[druid-indexing-service-25.0.0.jar:25.0.0]
   	at org.apache.druid.indexing.seekablestream.StreamChunkParser.parse(StreamChunkParser.java:104) ~[druid-indexing-service-25.0.0.jar:25.0.0]
   	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:635) ~[druid-indexing-service-25.0.0.jar:25.0.0]
   	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:266) ~[druid-indexing-service-25.0.0.jar:25.0.0]
   	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.runTask(SeekableStreamIndexTask.java:151) ~[druid-indexing-service-25.0.0.jar:25.0.0]
   	at org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:169) ~[druid-indexing-service-25.0.0.jar:25.0.0]
   	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:477) ~[druid-indexing-service-25.0.0.jar:25.0.0]
   	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:449) ~[druid-indexing-service-25.0.0.jar:25.0.0]
   	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
   	at java.lang.Thread.run(Thread.java:834) ~[?:?]
   Caused by: java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:850) ~[?:?]
   	at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:372) ~[?:?]
   	at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:289) ~[?:?]
   	at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:209) ~[?:?]
   	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469) ~[?:?]
   	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459) ~[?:?]
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191) ~[?:?]
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[?:?]
   	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) ~[?:?]
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) ~[?:?]
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[?:?]
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[?:?]
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[?:?]
   	at org.apache.druid.data.input.avro.InlineSchemaAvroBytesDecoder.parse(InlineSchemaAvroBytesDecoder.java:88) ~[?:?]
   	... 17 more
   2023-03-01T03:52:23,268 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted rows[0] and (estimated) bytes[0]
   2023-03-01T03:52:23,272 INFO [[index_kafka_test-duba_77ed6ecdfe1baa6_enmkjpdg]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Flushed in-memory data with commit metadata [AppenderatorDriverMetadata{segments={}, lastSegmentIds={}, callerMetadata={nextPartitions=SeekableStreamEndSequenceNumbers{stream='test-duba', partitionSequenceNumberMap={0=0}}}}] for segments: 
   2023-03-01T03:52:23,272 INFO [[index_kafka_test-duba_77ed6ecdfe1baa6_enmkjpdg]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted stats: processed rows: [0], persisted rows[0], sinks: [0], total fireHydrants (across sinks): [0], persisted fireHydrants (across sinks): [0]
   2023-03-01T03:52:23,272 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafka-supervisor-gjmihhhb-1, groupId=kafka-supervisor-gjmihhhb] Resetting generation and member id due to: consumer pro-actively leaving the group
   2023-03-01T03:52:23,272 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafka-supervisor-gjmihhhb-1, groupId=kafka-supervisor-gjmihhhb] Request joining group due to: consumer pro-actively leaving the group
   2023-03-01T03:52:23,273 INFO [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
   2023-03-01T03:52:23,273 INFO [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 closed issue #13894: Uncaught ParseException when reading Avro from Kafka

Posted by "abhishekagarwal87 (via GitHub)" <gi...@apache.org>.
abhishekagarwal87 closed issue #13894: Uncaught ParseException when reading Avro from Kafka
URL: https://github.com/apache/druid/issues/13894


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org