You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sugandha Amatya <su...@gmail.com> on 2018/02/25 06:04:47 UTC

Flink job stops when reading file from FTP

Anyone having problem reading file from FTP? The job suddenly stops?  What
is good strategy to move old files from FTP?

https://stackoverflow.com/questions/48797835/flink-streaming-file-not-found


3:49:58,518 INFO
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
- Forwarding split: [13]
ftp://userflink:userflink@10.9.2.11/user/content/data20180225_205_133669.csv
mod@ 1519489200000 : 0 + 763
03:49:58,522 INFO  org.apache.flink.runtime.taskmanager.Task
     - Source: Custom File Source (1/1) (3d444a92817df120fcf6579329ffea39)
switched from CANCELING to CANCELED.
03:49:58,522 INFO  org.apache.flink.runtime.taskmanager.Task
     - Freeing task resources for Source: Custom File Source (1/1)
(3d444a92817df120fcf6579329ffea39).
03:49:58,522 INFO  org.apache.flink.runtime.taskmanager.Task
     - Ensuring all FileSystem streams are closed for task Source: Custom
File Source (1/1) (3d444a92817df120fcf6579329ffea39) [CANCELED]
03:49:58,522 INFO  org.apache.flink.runtime.taskmanager.TaskManager
      - Un-registering task and sending final execution state CANCELED to
JobManager for task Source: Custom File Source
(3d444a92817df120fcf6579329ffea39)
03:49:58,537 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
      - Source: Custom File Source (1/1) (3d444a92817df120fcf6579329ffea39)
switched from CANCELING to CANCELED.
03:49:58,537 INFO
org.apache.flink.runtime.client.JobSubmissionClientActor      - 02/25/2018
03:49:58 Source: Custom File Source(1/1) switched to CANCELED
02/25/2018 03:49:58 Source: Custom File Source(1/1) switched to CANCELED
03:49:58,537 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
      - Try to restart or fail the job Flink Streaming Job
(7e45fa4918ba225a7c224d64cbade4fd) if no longer possible.
03:49:58,537 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
      - Job Flink Streaming Job (7e45fa4918ba225a7c224d64cbade4fd) switched
from state FAILING to FAILED.
java.io.IOException: Error opening the Input Split
ftp://userflink:userflink@10.9.2.11/user/content/data20180225_307_274804.csv
[61440,4096]: File
ftp://userflink:userflink@10.9.2.11/user/content/data20180225_307_274804.csv
does not exist.
at
org.apache.flink.api.datamon.io.FileInputFormat.open(FileInputFormat.java:705)
at
org.apache.flink.api.datamon.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:309)
Caused by: java.io.FileNotFoundException: File
ftp://userflink:userflink@10.9.2.11/user/content/data20180225_307_274804.csv
does not exist.
at
org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:450)
at org.apache.hadoop.fs.ftp.FTPFileSystem.open(FTPFileSystem.java:196)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at
org.apache.flink.api.datamon.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)
03:49:58,537 INFO
org.apache.flink.runtime.client.JobSubmissionClientActor      - 02/25/2018
03:49:58 Job execution switched to status FAILED.
02/25/2018 03:49:58 Job execution switched to status FAILED.
03:49:58,537 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
      - Could not restart the job Flink Streaming Job
(7e45fa4918ba225a7c224d64cbade4fd) because the restart strategy prevented
it.
java.io.IOException: Error opening the Input Split
ftp://userflink:userflink@10.9.2.11/user/content/data20180225_307_274804.csv
[61440,4096]: File
ftp://userflink:userflink@10.9.2.11/user/content/data20180225_307_274804.csv
does not exist.
at
org.apache.flink.api.datamon.io.FileInputFormat.open(FileInputFormat.java:705)
at
org.apache.flink.api.datamon.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:309)
Caused by: java.io.FileNotFoundException: File
ftp://userflink:userflink@10.9.2.11/user/content/data20180225_307_274804.csv
does not exist.
at
org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:450)
at org.apache.hadoop.fs.ftp.FTPFileSystem.open(FTPFileSystem.java:196)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at
org.apache.flink.api.datamon.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)
03:49:58,537 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping
checkpoint coordinator for job 7e45fa4918ba225a7c224d64cbade4fd
03:49:58,537 INFO
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
Shutting down
03:49:58,600 INFO  org.apache.flink.runtime.client.JobClient
     - Job execution failed
03:49:58,600 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
     - Stopping FlinkMiniCluster.
03:49:58,662 INFO  org.apache.flink.runtime.taskmanager.TaskManager
      - Stopping TaskManager akka://flink/user/taskmanager_1#-34197196.
03:49:58,662 INFO  org.apache.flink.runtime.jobmanager.JobManager
      - Stopping JobManager akka://flink/user/jobmanager_1.
03:49:58,662 INFO  org.apache.flink.runtime.taskmanager.TaskManager
      - Disassociating from JobManager
03:49:58,678 INFO  org.apache.flink.runtime.blob.PermanentBlobCache
      - Shutting down BLOB cache
03:49:58,678 INFO  org.apache.flink.runtime.blob.TransientBlobCache
      - Shutting down BLOB cache
03:49:58,678 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager
      - I/O manager removed spill file directory
C:\Users\ADMINI~1\AppData\Local\Temp\2\flink-io-f2ef6a62-bcb5-401c-ab2e-c30fdfbf5a02
03:49:58,678 INFO  org.apache.flink.runtime.io.network.NetworkEnvironment
      - Shutting down the network environment and its dataponents.
03:49:58,694 INFO  org.apache.flink.runtime.blob.BlobServer
      - Stopped BLOB server at 0.0.0.0:60494
03:49:58,709 INFO
org.apache.flink.runtime.client.JobSubmissionClientActor      - Terminate
JobClientActor.
03:49:58,709 INFO
org.apache.flink.runtime.client.JobSubmissionClientActor      - Disconnect
from JobManager Actor[akka://flink/user/jobmanager_1#-1884774819].
03:49:58,741 INFO  org.apache.flink.runtime.taskmanager.TaskManager
      - Task manager akka://flink/user/taskmanager_1 is datapletely shut
down.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Error opening the Input Split
ftp://userflink:userflink@10.9.2.11/user/content/data20180225_307_274804.csv
[61440,4096]: File
ftp://userflink:userflink@10.9.2.11/user/content/data20180225_307_274804.csv
does not exist.
at
org.apache.flink.api.datamon.io.FileInputFormat.open(FileInputFormat.java:705)
at
org.apache.flink.api.datamon.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:309)
Caused by: java.io.FileNotFoundException: File
ftp://userflink:userflink@10.9.2.11/user/content/data20180225_307_274804.csv
does not exist.
at
org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:450)
at org.apache.hadoop.fs.ftp.FTPFileSystem.open(FTPFileSystem.java:196)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at
org.apache.flink.api.datamon.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)