You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Sanjar Akhmedov (JIRA)" <ji...@apache.org> on 2015/12/02 17:43:11 UTC
[jira] [Created] (FLINK-3101) Flink Kafka consumer crashes with NPE
when it sees deleted record
Sanjar Akhmedov created FLINK-3101:
--------------------------------------
Summary: Flink Kafka consumer crashes with NPE when it sees deleted record
Key: FLINK-3101
URL: https://issues.apache.org/jira/browse/FLINK-3101
Project: Flink
Issue Type: Bug
Components: Kafka Connector
Affects Versions: 0.10.1, 1.0.0
Environment: Apache Flink 0.10.1 binary for Hadoop 2.6.0 with Scala 2.10.
Reporter: Sanjar Akhmedov
Kafka allows a records to be deleted from the log by sending a record with key and null payload. Consumers still can see those null values (deletes) before they are compacted (delete retention point).
Flink Kafka consumer crashes with NPE when it sees such record:
{noformat}
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:443)
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)