You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mu Kong <ko...@gmail.com> on 2018/03/06 11:51:36 UTC

Flink is looking for Kafka topic "n/a"

Hi,

I have encountered a wired problem.
After I start the job for several days, Flink gave me the following error:

*java.lang.RuntimeException: Unable to find a leader for partitions:
[Partition: KafkaTopicPartition{topic='n/a', partition=-1},
KafkaPartitionHandle=[n/a,-1], offset=(not set)]*
*        at
org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:495)*
*        at
org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205)*
*        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)*
*        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)*
*        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)*
*        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)*
*        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)*
*        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)*
*        at java.lang.Thread.run(Thread.java:748)*

The Flink job died after this error and tried to restart but in vain at the
end.

Is there any reason why Flink was unable to find a leader for the partition?
A more confusing question would be why is it trying to find topic 'n/a',
instead of the topic we have specified?

Thanks in advance!

Best regards,
Mu

Re: Flink is looking for Kafka topic "n/a"

Posted by Nico Kruber <ni...@data-artisans.com>.
I think, I found a code path (race between threads) that may lead to two
markers being in the list.

I created https://issues.apache.org/jira/browse/FLINK-8896 to track this
and will have a pull request ready (probably) today.


Nico

On 07/03/18 10:09, Mu Kong wrote:
> Hi Gordon,
> 
> Thanks for your response.
> I think I've misspoken about the failure after "n/a" exception.
> The behavior after this exception would be:
> 
> switched from RUNNING to CANCELING
> switched from CANCELING to CANCELED
> Try to restart or fail the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) if no
> longer possible.
> switched from state FAILING to RESTARTING
> Restarting the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx)
> Recovering checkpoints from ZooKeeper
> Found 1 checkpoints in ZooKeeper
> Trying to retrieve checkpoint 1091
> Restoring from latest valid checkpoint: Checkpoint 1091 @
> xxxxxxxxxxxxxxxxxxxx for xxxxxxxxxxxxxxxxxxxx
> switched from CREATED to SCHEDULED
> switched from SCHEDULED to DEPLOYING
> switched from DEPLOYING to RUNNING
> (several check pointings)
> switched from RUNNING to FAILED
> TimerException{java.io.EOFException:Premature EOF: no length prefix
> available}
>         at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException: Premature EOF: no length prefix available
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
> 
> Since there several successful check points after the restart, I think
> the later failure might be something else.
> Also, could you please share more information about the MARKER in the
> code? Like which piece of code should I look for.
> 
> And thanks for the suggestion to let me upgrade the flink to 1.3.2
> 
> Best regards,
> Mu
> 
> 
> On Wed, Mar 7, 2018 at 3:04 PM, Tzu-Li Tai <tzulitai@gmail.com
> <ma...@gmail.com>> wrote:
> 
>     Hi Mu,
> 
>     You mentioned that the job stopped after the "n/a" topic error, but
>     the job
>     failed to recover.
>     What exception did you encounter in the restart executions? Was it
>     the same
>     error?
>     This would verify if we actually should be removing more than one of
>     these
>     special MARKER partition states.
> 
>     On the other hand, if I recall correctly, the Kafka consumer had a
>     severe
>     bug in 1.3.0 which could lead to potential duplicate data, which was
>     fixed
>     in 1.3.2. Though I don't think it is related to the error you
>     encountered, I
>     strongly recommend that you use 1.3.2 instead.
> 
>     Cheers,
>     Gordon
> 
> 
> 
>     --
>     Sent from:
>     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>     <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 
> 


Re: Flink is looking for Kafka topic "n/a"

Posted by Mu Kong <ko...@gmail.com>.
Hi Gordon,

Thanks for your response.
I think I've misspoken about the failure after "n/a" exception.
The behavior after this exception would be:

switched from RUNNING to CANCELING
switched from CANCELING to CANCELED
Try to restart or fail the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) if no
longer possible.
switched from state FAILING to RESTARTING
Restarting the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx)
Recovering checkpoints from ZooKeeper
Found 1 checkpoints in ZooKeeper
Trying to retrieve checkpoint 1091
Restoring from latest valid checkpoint: Checkpoint 1091 @
xxxxxxxxxxxxxxxxxxxx for xxxxxxxxxxxxxxxxxxxx
switched from CREATED to SCHEDULED
switched from SCHEDULED to DEPLOYING
switched from DEPLOYING to RUNNING
(several check pointings)
switched from RUNNING to FAILED
TimerException{java.io.EOFException:Premature EOF: no length prefix
available}
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: Premature EOF: no length prefix available
        at
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282)
        at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347)
        at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
        at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)

Since there several successful check points after the restart, I think the
later failure might be something else.
Also, could you please share more information about the MARKER in the code?
Like which piece of code should I look for.

And thanks for the suggestion to let me upgrade the flink to 1.3.2

Best regards,
Mu


On Wed, Mar 7, 2018 at 3:04 PM, Tzu-Li Tai <tz...@gmail.com> wrote:

> Hi Mu,
>
> You mentioned that the job stopped after the "n/a" topic error, but the job
> failed to recover.
> What exception did you encounter in the restart executions? Was it the same
> error?
> This would verify if we actually should be removing more than one of these
> special MARKER partition states.
>
> On the other hand, if I recall correctly, the Kafka consumer had a severe
> bug in 1.3.0 which could lead to potential duplicate data, which was fixed
> in 1.3.2. Though I don't think it is related to the error you encountered,
> I
> strongly recommend that you use 1.3.2 instead.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Flink is looking for Kafka topic "n/a"

Posted by Tzu-Li Tai <tz...@gmail.com>.
Hi Mu,

You mentioned that the job stopped after the "n/a" topic error, but the job
failed to recover.
What exception did you encounter in the restart executions? Was it the same
error?
This would verify if we actually should be removing more than one of these
special MARKER partition states.

On the other hand, if I recall correctly, the Kafka consumer had a severe
bug in 1.3.0 which could lead to potential duplicate data, which was fixed
in 1.3.2. Though I don't think it is related to the error you encountered, I
strongly recommend that you use 1.3.2 instead.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink is looking for Kafka topic "n/a"

Posted by Mu Kong <ko...@gmail.com>.
Hi Nico,

Thanks for your prompt response.
I'm using Flink 1.3.0 for this job.

Please let me know if you need more information.


Best regards,
Mu

On Tue, Mar 6, 2018 at 10:17 PM, Nico Kruber <ni...@data-artisans.com> wrote:

> Hi Mu,
> which version of flink are you using? I checked the latest branches for
> 1.2 - 1.5 to look for findLeaderForPartitions at line 205 in
> Kafka08Fetcher but they did not match. From what I can see in the code,
> there is a MARKER partition state with topic "n/a" but that is
> explicitly removed from the list of partitions to find leaders for in
> the code and solely used during cancelling the fetcher.
>
> I don't know whether this is possible, but I suppose there could be more
> than one marker and we should call removeAll() instead - @Gordon, can
> you elaborate/check whether this could happen?
>
>
> Nico
>
> On 06/03/18 12:51, Mu Kong wrote:
> > Hi,
> >
> > I have encountered a wired problem.
> > After I start the job for several days, Flink gave me the following
> error:
> >
> > /java.lang.RuntimeException: Unable to find a leader for partitions:
> > [Partition: KafkaTopicPartition{topic='n/a', partition=-1},
> > KafkaPartitionHandle=[n/a,-1], offset=(not set)]/
> > /        at
> > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.
> findLeaderForPartitions(Kafka08Fetcher.java:495)/
> > /        at
> > org.apache.flink.streaming.connectors.kafka.internals.
> Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205)/
> > /        at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(
> FlinkKafkaConsumerBase.java:449)/
> > /        at
> > org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:87)/
> > /        at
> > org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:55)/
> > /        at
> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
> SourceStreamTask.java:95)/
> > /        at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:262)/
> > /        at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:702)/
> > /        at java.lang.Thread.run(Thread.java:748)/
> > /
> > /
> > The Flink job died after this error and tried to restart but in vain at
> > the end.
> >
> > Is there any reason why Flink was unable to find a leader for the
> partition?
> > A more confusing question would be why is it trying to find topic 'n/a',
> > instead of the topic we have specified?
> >
> > Thanks in advance!
> >
> > Best regards,
> > Mu
>
>

Re: Flink is looking for Kafka topic "n/a"

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Mu,
which version of flink are you using? I checked the latest branches for
1.2 - 1.5 to look for findLeaderForPartitions at line 205 in
Kafka08Fetcher but they did not match. From what I can see in the code,
there is a MARKER partition state with topic "n/a" but that is
explicitly removed from the list of partitions to find leaders for in
the code and solely used during cancelling the fetcher.

I don't know whether this is possible, but I suppose there could be more
than one marker and we should call removeAll() instead - @Gordon, can
you elaborate/check whether this could happen?


Nico

On 06/03/18 12:51, Mu Kong wrote:
> Hi,
> 
> I have encountered a wired problem.
> After I start the job for several days, Flink gave me the following error:
> 
> /java.lang.RuntimeException: Unable to find a leader for partitions:
> [Partition: KafkaTopicPartition{topic='n/a', partition=-1},
> KafkaPartitionHandle=[n/a,-1], offset=(not set)]/
> /        at
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:495)/
> /        at
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205)/
> /        at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)/
> /        at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)/
> /        at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)/
> /        at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)/
> /        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)/
> /        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)/
> /        at java.lang.Thread.run(Thread.java:748)/
> /
> /
> The Flink job died after this error and tried to restart but in vain at
> the end.
> 
> Is there any reason why Flink was unable to find a leader for the partition?
> A more confusing question would be why is it trying to find topic 'n/a',
> instead of the topic we have specified?
> 
> Thanks in advance!
> 
> Best regards,
> Mu