You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2019/07/18 12:42:00 UTC

[jira] [Commented] (FLINK-11568) Exception in Kinesis ShardConsumer hidden by InterruptedException

    [ https://issues.apache.org/jira/browse/FLINK-11568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16887938#comment-16887938 ] 

Chesnay Schepler commented on FLINK-11568:
------------------------------------------

I see that the PR for this issue is merged. Is there anything else to do?

> Exception in Kinesis ShardConsumer hidden by InterruptedException 
> ------------------------------------------------------------------
>
>                 Key: FLINK-11568
>                 URL: https://issues.apache.org/jira/browse/FLINK-11568
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kinesis
>    Affects Versions: 1.6.2
>            Reporter: Shannon Carey
>            Assignee: Shannon Carey
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> When the Kinesis ShardConsumer encounters an exception, for example due to a problem in the Deserializer, the root cause exception is often hidden by a non-informative InterruptedException caused by the FlinkKinesisConsumer thread being interrupted.
> Ideally, the root cause exception would be preserved and thrown so that the logs contain enough information to diagnose the issue.
> This probably affects all versions.
> Here's an example of a log message with the unhelpful InterruptedException:
> {code:java}
> 2019-02-05 13:29:31:383 thread=Source: Custom Source -> Filter -> Map -> Sink: Unnamed (1/8), level=WARN, logger=org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer, message="Error while closing Kinesis data fetcher"
> java.lang.InterruptedException: sleep interrupted
> at java.lang.Thread.sleep(Native Method)
> at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:450)
> at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:314)
> at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:323)
> at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> And here's an example of the real exception that we're actually interested in, which is stored inside KinesisDataFetcher#error, but is not thrown or logged:
> {code:java}
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
> org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:135)
> org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper.deserialize(KinesisDeserializationSchemaWrapper.java:44)
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:332)
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:231)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
> java.util.concurrent.FutureTask.run(FutureTask.java)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)