You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Sanjar Akhmedov (JIRA)" <ji...@apache.org> on 2015/12/02 17:43:11 UTC

[jira] [Created] (FLINK-3101) Flink Kafka consumer crashes with NPE when it sees deleted record

Sanjar Akhmedov created FLINK-3101:
--------------------------------------

             Summary: Flink Kafka consumer crashes with NPE when it sees deleted record
                 Key: FLINK-3101
                 URL: https://issues.apache.org/jira/browse/FLINK-3101
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 0.10.1, 1.0.0
         Environment: Apache Flink 0.10.1 binary for Hadoop 2.6.0	with Scala 2.10. 
            Reporter: Sanjar Akhmedov


Kafka allows a records to be deleted from the log by sending a record with key and null payload. Consumers still can see those null values (deletes) before they are compacted (delete retention point).

Flink Kafka consumer crashes with NPE when it sees such record:
{noformat}
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:443)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)