You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Laura Uzcátegui <la...@gmail.com> on 2019/03/06 13:35:59 UTC

Job continuously failing after Checkpoint Restore

Hi,

We are currently running a Flink Job that has 3 operators.

Source ---> Filter ---> Sink

As soon as the job is started it tries to recover from the latest
Checkpoint

[05-Mar-2019 13:09:55.365 UTC] INFO <CheckpointCoordinator> Restoring from
latest valid checkpoint: Checkpoint 60 @ 1551788864502 for
fd697c91437216e773bb862cbae56e0f.


Then under operators initialization, specifically Source operator which
reads from Kafka topics using a regex pattern, the job starts to fail with
the following exception:


[05-Mar-2019 13:10:11.756 UTC] INFO <ExecutionGraph> Job Data Lake
Ingestion (fd697c91437216e773bb862cbae56e0f) switched from state RUNNING to
FAILING. java.lang.NullPointerException     at
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:126)
    at
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:123)
    at java.util.TreeMap.put(Unknown Source)     at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:724)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)     at
java.lang.Thread.run(Unknown Source)


I was wondering if anyone has seen this before?


My assumption will be


We are currently running with the following settings:



   - Flink version: 1.4.2
   - Docker image with the job embedded
   - Job Parallelism : 8

Cheers,

Laura

Re: Job continuously failing after Checkpoint Restore

Posted by Yun Tang <my...@live.com>.
Hi Laura

From the exception stack, there exist two possible reasons causing this NPE. Either the KafkaTopicPartition is null or field topic of that KafkaTopicPartition form the union state is null. No matter what reason, the problem might existed in the KryoSerializer which used to de/serialize the KafkaTopicPartition class. Gordon (in CC) who is expert at serialization might offer more insights.

Before further discussion, would you please offer more information:

  1.  what version of Kafka did you use?
  2.  Did you ever meet this problem ever?
  3.  Have you ever changed anything before resuming your job?
  4.  If trying to restore checkpoint-60 again by submitting another job, will you also meet this NPE continuously again?

Best
Yun Tang
________________________________
From: Laura Uzcátegui <la...@gmail.com>
Sent: Wednesday, March 6, 2019 21:35
To: user
Subject: Job continuously failing after Checkpoint Restore

Hi,

We are currently running a Flink Job that has 3 operators.

Source ---> Filter ---> Sink

As soon as the job is started it tries to recover from the latest Checkpoint


[05-Mar-2019 13:09:55.365 UTC] INFO <CheckpointCoordinator> Restoring from latest valid checkpoint: Checkpoint 60 @ 1551788864502 for fd697c91437216e773bb862cbae56e0f.


Then under operators initialization, specifically Source operator which reads from Kafka topics using a regex pattern, the job starts to fail with the following exception:


[05-Mar-2019 13:10:11.756 UTC] INFO <ExecutionGraph> Job Data Lake Ingestion (fd697c91437216e773bb862cbae56e0f) switched from state RUNNING to FAILING. java.lang.NullPointerException     at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:126)     at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:123)     at java.util.TreeMap.put(Unknown Source)     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:724)     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)     at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)     at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)     at java.lang.Thread.run(Unknown Source)


I was wondering if anyone has seen this before?


My assumption will be


We are currently running with the following settings:


  *   Flink version: 1.4.2
  *   Docker image with the job embedded
  *   Job Parallelism : 8

Cheers,

Laura