You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shannon Carey <sc...@expedia.com> on 2017/08/04 18:56:42 UTC

Re: blob store defaults to /tmp and files get deleted

Stephan,

Regarding your last reply to http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/blob-store-defaults-to-tmp-and-files-get-deleted-td11720.html

You mention "Flink (via the user code class loader) actually holds a reference to the JAR files in "/tmp", so even if "/tmp" get wiped, the JAR file remains usable by the class loader". In my understanding, even if that's true, it doesn't work over a failure of the JobManager/TaskManager process, because the handle would be lost and the file would be gone.

We're still running Flink 1.2.1, so maybe we're missing out on some of the improvements that have been made. However, we recently had a problem with a batch (DataSet) job not restarting successfully, apparently after a JobManager failure. This particular job runs in AWS EMR (on Yarn) which means that only one JobManager is run at a time, and when it fails it gets restarted.

Here's what I can see from the logs. When the job restarts, it goes from CREATED -> RUNNING state, and then logs:

23:23:56,798 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        (flink-akka.actor.default-dispatcher-55): Job com.expedia…MyJob (c58185a78dd64cfc9f12374bd1f9a679) switched from state RUNNING to SUSPENDED.
java.lang.Exception: JobManager is no longer the leader.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:319)

I assume that's normal/expected, because the JobManager was restarted but some portion of the job state is still referring to the old one. Next, YarnJobManager logs: "Attempting to recover job c58185a78dd64cfc9f12374bd1f9a679." However, it subsequently fails:

2017-08-03 00:09:18,991 WARN  org.apache.flink.yarn.YarnJobManager                          (flink-akka.actor.default-dispatcher-96): Failed to recover job c58185a78dd64cfc9f12374bd1f9a679.
java.lang.Exception: Failed to retrieve the submitted job graph from state handle.
at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:180)
…
Caused by: java.lang.RuntimeException: Unable to instantiate the hadoop input format
at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.readObject(HadoopInputFormatBase.java:319)
…
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:178)
... 15 more
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.avro.AvroParquetInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.readObject(HadoopInputFormatBase.java:317)
... 69 more

The missing class comes from our job, so it seems like the job jar isn't present on the classpath of the JobManager. When I look at the contents of our configured blob storage directory (we're not using /tmp), I see subfolders like:

blobStore-7d40f1b9-7b06-400f-8c05-b5456adcd7f1
blobStore-f2d7974c-7d86-4b11-a7fb-d1936a4593ed

Only one of the two has a JAR in it, so it looks like there's a new directory created for each new JobManager. When I look in Zookeeper at nodes such as "/flink/main/jobgraphs/c58185a78dd64cfc9f12374bd1f9a679", I don't see those directories mentioned. I am wondering if someone can explain how Flink knows how to retrieve the job jar for job retry when the JobManager has failed? Are we running into a Flink bug here?

Thanks for the info,
Shannon


Re: blob store defaults to /tmp and files get deleted

Posted by Eron Wright <er...@gmail.com>.
The directory referred to by `blob.storage.directory` is best described as
a local cache.  For recovery purposes the JARs are also stored in `
high-availability.storageDir`.    At least that's my reading of the code in
1.2.   Maybe there's some YARN specific behavior too, sorry if this
information is incomplete.

https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java#L106
https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java#L362
https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java#L58
https://github.com/apache/flink/blob/release-1.2.1/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java#L57
https://github.com/apache/flink/blob/release-1.2.1/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java#L135


On Fri, Aug 4, 2017 at 11:56 AM, Shannon Carey <sc...@expedia.com> wrote:

> Stephan,
>
> Regarding your last reply to http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/blob-store-
> defaults-to-tmp-and-files-get-deleted-td11720.html
>
> You mention "Flink (via the user code class loader) actually holds a
> reference to the JAR files in "/tmp", so even if "/tmp" get wiped, the JAR
> file remains usable by the class loader". In my understanding, even if
> that's true, it doesn't work over a failure of the JobManager/TaskManager
> process, because the handle would be lost and the file would be gone.
>
> We're still running Flink 1.2.1, so maybe we're missing out on some of the
> improvements that have been made. However, we recently had a problem with a
> batch (DataSet) job not restarting successfully, apparently after a
> JobManager failure. This particular job runs in AWS EMR (on Yarn) which
> means that only one JobManager is run at a time, and when it fails it gets
> restarted.
>
> Here's what I can see from the logs. When the job restarts, it goes from
> CREATED -> RUNNING state, and then logs:
>
> 23:23:56,798 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        (flink-akka.actor.default-dispatcher-55): Job com.expedia…MyJob (
> c58185a78dd64cfc9f12374bd1f9a679) switched from state RUNNING to
> SUSPENDED.
> java.lang.Exception: JobManager is no longer the leader.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:319)
>
> I assume that's normal/expected, because the JobManager was restarted but
> some portion of the job state is still referring to the old one. Next,
> YarnJobManager logs: "Attempting to recover job
> c58185a78dd64cfc9f12374bd1f9a679." However, it subsequently fails:
>
> 2017-08-03 00:09:18,991 WARN  org.apache.flink.yarn.YarnJobManager
>                    (flink-akka.actor.default-dispatcher-96): Failed to
> recover job c58185a78dd64cfc9f12374bd1f9a679.
> java.lang.Exception: Failed to retrieve the submitted job graph from state
> handle.
> at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStor
> e.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:180)
> …
> Caused by: java.lang.RuntimeException: Unable to instantiate the hadoop
> input format
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.
> readObject(HadoopInputFormatBase.java:319)
> …
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:305)
> at org.apache.flink.runtime.state.RetrievableStreamStateHandle.
> retrieveState(RetrievableStreamStateHandle.java:58)
> at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStor
> e.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:178)
> ... 15 more
> Caused by: java.lang.ClassNotFoundException: org.apache.parquet.avro.
> AvroParquetInputFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.
> readObject(HadoopInputFormatBase.java:317)
> ... 69 more
>
> The missing class comes from our job, so it seems like the job jar isn't
> present on the classpath of the JobManager. When I look at the contents of
> our configured blob storage directory (we're not using /tmp), I see
> subfolders like:
>
> blobStore-7d40f1b9-7b06-400f-8c05-b5456adcd7f1
> blobStore-f2d7974c-7d86-4b11-a7fb-d1936a4593ed
>
> Only one of the two has a JAR in it, so it looks like there's a new
> directory created for each new JobManager. When I look in Zookeeper at
> nodes such as "/flink/main/jobgraphs/c58185a78dd64cfc9f12374bd1f9a679", I
> don't see those directories mentioned. I am wondering if someone can
> explain how Flink knows how to retrieve the job jar for job retry when the
> JobManager has failed? Are we running into a Flink bug here?
>
> Thanks for the info,
> Shannon
>
>