You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Ghiya, Jay (GE Healthcare)" <Ja...@ge.com> on 2021/12/02 06:48:43 UTC

Issues in Batch Jobs Submission for a Session Cluster

Hi Flink Team,

Greetings from GE Healthcare team.

Here is a stackoverflow post for the same too posted by fellow dev here : https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory

Summary of the post:

Here is the usecase and relevant configuration:

  1.  A flink session cluster in Kubernetes is being utilized to submit batch jobs every 1minute. Run time for a batch job is <30 seconds.
  2.  This Flink session cluster is running in HA setup. This means it stores job graph and its relevant metadata in flink " /recovery/default/blob/," folder for each job that is submitted.
  3.  There is "5 Gb" pvc attached to this session cluster for HA and is based out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html . Rook is used for orchestration.

Ideal Working Scenario:

  1.  Upon a successful job submission, the metadata is created and cleared after completion. Average size of blob getting created under recovery folder for ha is 150 mb. (Enough space in pvc)

Failure:

  1.  During a long run say for 100 minutes so 100 job submissions , a flink job submission will fail stating :


2021-11-22 09:03:11,537 INFO  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished cleaning up the high availability data for job 6a71a36a3c82d8a9438c9aa9ed6b8993.
2021-11-22 09:03:14,904 ERROR org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT operation failed
java.io.IOException: Cannot allocate memory
    at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312]
    at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_312]
    at org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350) [flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110) [flink-dist_2.11-1.14.0.jar:1.14.0]

NOTE that still pvc has not become full. Also the next job submission would succeed without complaining about the storage being full. But due to random failures eventually the old blobs will pile up in recovery folder making pvc full after which all jobs submissions will fail.

Request immediate help here folks. Any pointers to why this behavior. We are relying on HA to make sure each job runs fine. This is mission critical data.
Secondly what we would want to see - why stale data is not being cleared off ? is it a configuration that we have not done. Still this does not solve our data loss problem we experience due to intermittent job submission failures but will make sure our pvc is not unnecessary saturated beyond which all jobs fail.

Thankyou in advance.

-Jay
GEHC






Re: FW: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

Posted by David Morávek <dm...@apache.org>.
Hi,

As far as I understand "java.io.IOException: Cannot allocate memory"
happens when JVM is not able to allocate a new memory from the OS. If
that's that case, I'd suggest increasing JVM overhead [1], because that's
basically a pool of memory, that is free to be allocated by native
libraries.

How are you submitting jobs into the cluster? Are you using command line
client or the web-submission?

If increasing JVM overhead doesn't help, you'll need to find out where the
native memory goes. The most straightforward approach I'm aware of is to
track allocations using jemalloc [2].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_jobmanager/#detailed-configuration
[2]
https://technology.blog.gov.uk/2015/12/11/using-jemalloc-to-get-to-the-bottom-of-a-memory-leak/

Best,
D.

On Mon, Dec 13, 2021 at 11:49 AM Ghiya, Jay (GE Healthcare) <
Jay.Ghiya@ge.com> wrote:

> Hi @David Morávek <da...@gmail.com>,
>
>
>
> PFA details regarding memory config in the configmap we have set and
> corresponding usage details in terms of cpu,mem and jvm when the issue
> happens.
>
>
>
> Credits: @R, Aromal (GE Healthcare, consultant) <Ar...@ge.com>
>
>
>
> -Jay
>
>
>
> *From:* R, Aromal (GE Healthcare, consultant) <Ar...@ge.com>
> *Sent:* 13 December 2021 16:16
> *To:* Ghiya, Jay (GE Healthcare) <Ja...@ge.com>
> *Subject:* FW: EXT: Re: Issues in Batch Jobs Submission for a Session
> Cluster
>
>
>
> FYI
>
>
>
> *From:* R, Aromal (GE Healthcare, consultant)
> *Sent:* 13 December 2021 16:09
> *To:* David Morávek <da...@gmail.com>
> *Cc:* user@flink.apache.org; Nellimarla, Aswini (GE Healthcare) <
> Aswini.Nellimarla@ge.com>; Kumar, Vipin (GE Healthcare) <
> vipin.S.kumar@ge.com>; Maniyan, Pramod (GE Healthcare, consultant) <
> Pramod.Maniyan@ge.com>; Ghiya, Jay (GE Healthcare) <Ja...@ge.com>
> *Subject:* RE: EXT: Re: Issues in Batch Jobs Submission for a Session
> Cluster
>
>
>
> Hi Moravek,
>
>
>
> I am attaching the Grafana dashboard graphs of the cpu and mem usage.
>
>
>
> We have a Flink Session cluster with 3 JM and 4 TM deployed. HA is
> enabled. Attaching the config file also.
>
>
>
> On the path   /flink/recovery/default/blob/ two folders didn’t get deleted.
>
>
>
> *drwxr-xr-x 1 flink flink 1 Dec 13 06:07
> job_3d86ca8904bafa74fb587ff344427ff0*
>
> *drwxr-xr-x 1 flink flink 1 Dec 13 07:01
> job_6621c5d3633423f86c809a16fa629567*
>
>
>
> During this time the graphs(cpu, mem usage) seems normal. Could you also
> take a look at the graphs.
>
>
>
> Thanks And Regards
>
> Aromal
>
>
>
> *From:* Ghiya, Jay (GE Healthcare)
> *Sent:* 03 December 2021 12:49
> *To:* David Morávek <da...@gmail.com>; R, Aromal (GE Healthcare,
> consultant) <Ar...@ge.com>
> *Cc:* user@flink.apache.org; Nellimarla, Aswini (GE Healthcare) <
> Aswini.Nellimarla@ge.com>; Kumar, Vipin (GE Healthcare) <
> vipin.S.kumar@ge.com>; Maniyan, Pramod (GE Healthcare, consultant) <
> Pramod.Maniyan@ge.com>
> *Subject:* RE: EXT: Re: Issues in Batch Jobs Submission for a Session
> Cluster
>
>
>
> Thanks for prompt response. Understood @David Morávek
> <da...@gmail.com>. Will record cpu and mem usage from Kubernetes
> metrics Grafana dashboard  of job managers and task managers when this
> happens and share here. If there is anything abnormal then we can get the
> jvm metrics for each pod in terms of heap,non-heap,gc behaviour .
>
>
>
> @R, Aromal (GE Healthcare, consultant) <Ar...@ge.com> Can you please
> post cpu and mem usage of jms and tms from the Kubernetes dashboard when
> this issue happens?
>
>
>
> Thanks
>
> Jay
>
>
>
> *From:* David Morávek <da...@gmail.com>
> *Sent:* 02 December 2021 16:34
> *To:* Ghiya, Jay (GE Healthcare) <Ja...@ge.com>
> *Cc:* user@flink.apache.org; Nellimarla, Aswini (GE Healthcare) <
> Aswini.Nellimarla@ge.com>; R, Aromal (GE Healthcare, consultant) <
> Aromal.R@ge.com>; Kumar, Vipin (GE Healthcare) <vi...@ge.com>;
> Maniyan, Pramod (GE Healthcare, consultant) <Pr...@ge.com>
> *Subject:* EXT: Re: Issues in Batch Jobs Submission for a Session Cluster
>
>
>
> *WARNING: *This email originated from outside of GE. Please validate the
> sender's email address before clicking on links or attachments as they may
> not be safe.
>
> Hi Jay,
>
>
>
> It's hard to say what going on here. My best guess is that you're running
> out of memory for your process (eg. hitting ulimit). Can you please start
> with checking the ulimits memory usage of your container?
>
>
>
> For the cleanup, right now it may happen in some failover scenarios that
> we don't cleanup some part's of the HA services (eg. blob store). There is
> FLIP-194 that addresses this limitation.
>
>
>
> At first sight this sounds bit like a native memory leak, but these are in
> general really tricky to debug. Let's start with simply getting some stats
> about the actual memory usage.
>
>
>
> Best,
>
> D.
>
>
>
>
>
> On Thu, Dec 2, 2021 at 7:49 AM Ghiya, Jay (GE Healthcare) <
> Jay.Ghiya@ge.com> wrote:
>
> Hi Flink Team,
>
>
>
> Greetings from GE Healthcare team.
>
>
>
> Here is a stackoverflow post for the same too posted by fellow dev here :
> https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory
>
>
>
> Summary of the post:
>
>
>
> Here is the usecase and relevant configuration:
>
>    1. A flink session cluster in Kubernetes is being utilized to submit
>    batch jobs every 1minute. Run time for a batch job is <30 seconds.
>    2. This Flink session cluster is running in HA setup. This means it
>    stores job graph and its relevant metadata in flink “
>     /recovery/default/blob/,” folder for each job that is submitted.
>    3. There is “5 Gb” pvc attached to this session cluster for HA and is
>    based out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html
>    . Rook is used for orchestration.
>
>
>
> Ideal Working Scenario:
>
>    1. Upon a successful job submission, the metadata is created and
>    cleared after completion. Average size of blob getting created under
>    recovery folder for ha is 150 mb. (Enough space in pvc)
>
>
>
> Failure:
>
>    1. During a long run say for 100 minutes so 100 job submissions , a
>    flink job submission will fail stating :
>
>
>
> 2021-11-22 09:03:11,537 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> 6a71a36a3c82d8a9438c9aa9ed6b8993.
>
> 2021-11-22 09:03:14,904 ERROR
> org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT
> operation failed
>
> java.io.IOException: Cannot allocate memory
>
>     at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312]
>
>     at java.io.FileOutputStream.write(FileOutputStream.java:326)
> ~[?:1.8.0_312]
>
>     at
> org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350)
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>
>
>
> NOTE that still pvc has not become full. Also the next job submission
> would succeed without complaining about the storage being full. But due to
> random failures eventually the old blobs will pile up in recovery folder
> making pvc full after which all jobs submissions will fail.
>
>
>
> Request immediate help here folks. Any pointers to why this behavior. We
> are relying on HA to make sure each job runs fine. This is mission critical
> data.
>
> Secondly what we would want to see – why stale data is not being cleared
> off ? is it a configuration that we have not done. Still this does not
> solve our data loss problem we experience due to intermittent job
> submission failures but will make sure our pvc is not unnecessary saturated
> beyond which all jobs fail.
>
>
>
> Thankyou in advance.
>
> -Jay
>
> GEHC
>
>
>
>
>
>
>
>

FW: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

Posted by "Ghiya, Jay (GE Healthcare)" <Ja...@ge.com>.
Hi @David Morávek<ma...@gmail.com>,

PFA details regarding memory config in the configmap we have set and corresponding usage details in terms of cpu,mem and jvm when the issue happens.

Credits: @R, Aromal (GE Healthcare, consultant)<ma...@ge.com>

-Jay

From: R, Aromal (GE Healthcare, consultant) <Ar...@ge.com>
Sent: 13 December 2021 16:16
To: Ghiya, Jay (GE Healthcare) <Ja...@ge.com>
Subject: FW: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

FYI

From: R, Aromal (GE Healthcare, consultant)
Sent: 13 December 2021 16:09
To: David Morávek <da...@gmail.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Nellimarla, Aswini (GE Healthcare) <As...@ge.com>>; Kumar, Vipin (GE Healthcare) <vi...@ge.com>>; Maniyan, Pramod (GE Healthcare, consultant) <Pr...@ge.com>>; Ghiya, Jay (GE Healthcare) <Ja...@ge.com>>
Subject: RE: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

Hi Moravek,

I am attaching the Grafana dashboard graphs of the cpu and mem usage.

We have a Flink Session cluster with 3 JM and 4 TM deployed. HA is enabled. Attaching the config file also.

On the path   /flink/recovery/default/blob/ two folders didn’t get deleted.

drwxr-xr-x 1 flink flink 1 Dec 13 06:07 job_3d86ca8904bafa74fb587ff344427ff0
drwxr-xr-x 1 flink flink 1 Dec 13 07:01 job_6621c5d3633423f86c809a16fa629567

During this time the graphs(cpu, mem usage) seems normal. Could you also take a look at the graphs.

Thanks And Regards
Aromal

From: Ghiya, Jay (GE Healthcare)
Sent: 03 December 2021 12:49
To: David Morávek <da...@gmail.com>>; R, Aromal (GE Healthcare, consultant) <Ar...@ge.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Nellimarla, Aswini (GE Healthcare) <As...@ge.com>>; Kumar, Vipin (GE Healthcare) <vi...@ge.com>>; Maniyan, Pramod (GE Healthcare, consultant) <Pr...@ge.com>>
Subject: RE: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

Thanks for prompt response. Understood @David Morávek<ma...@gmail.com>. Will record cpu and mem usage from Kubernetes metrics Grafana dashboard  of job managers and task managers when this happens and share here. If there is anything abnormal then we can get the jvm metrics for each pod in terms of heap,non-heap,gc behaviour .

@R, Aromal (GE Healthcare, consultant)<ma...@ge.com> Can you please post cpu and mem usage of jms and tms from the Kubernetes dashboard when this issue happens?

Thanks
Jay

From: David Morávek <da...@gmail.com>>
Sent: 02 December 2021 16:34
To: Ghiya, Jay (GE Healthcare) <Ja...@ge.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Nellimarla, Aswini (GE Healthcare) <As...@ge.com>>; R, Aromal (GE Healthcare, consultant) <Ar...@ge.com>>; Kumar, Vipin (GE Healthcare) <vi...@ge.com>>; Maniyan, Pramod (GE Healthcare, consultant) <Pr...@ge.com>>
Subject: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

WARNING: This email originated from outside of GE. Please validate the sender's email address before clicking on links or attachments as they may not be safe.
Hi Jay,

It's hard to say what going on here. My best guess is that you're running out of memory for your process (eg. hitting ulimit). Can you please start with checking the ulimits memory usage of your container?

For the cleanup, right now it may happen in some failover scenarios that we don't cleanup some part's of the HA services (eg. blob store). There is FLIP-194 that addresses this limitation.

At first sight this sounds bit like a native memory leak, but these are in general really tricky to debug. Let's start with simply getting some stats about the actual memory usage.

Best,
D.


On Thu, Dec 2, 2021 at 7:49 AM Ghiya, Jay (GE Healthcare) <Ja...@ge.com>> wrote:
Hi Flink Team,

Greetings from GE Healthcare team.

Here is a stackoverflow post for the same too posted by fellow dev here : https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory

Summary of the post:

Here is the usecase and relevant configuration:

  1.  A flink session cluster in Kubernetes is being utilized to submit batch jobs every 1minute. Run time for a batch job is <30 seconds.
  2.  This Flink session cluster is running in HA setup. This means it stores job graph and its relevant metadata in flink “ /recovery/default/blob/,” folder for each job that is submitted.
  3.  There is “5 Gb” pvc attached to this session cluster for HA and is based out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html . Rook is used for orchestration.

Ideal Working Scenario:

  1.  Upon a successful job submission, the metadata is created and cleared after completion. Average size of blob getting created under recovery folder for ha is 150 mb. (Enough space in pvc)

Failure:

  1.  During a long run say for 100 minutes so 100 job submissions , a flink job submission will fail stating :


2021-11-22 09:03:11,537 INFO  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished cleaning up the high availability data for job 6a71a36a3c82d8a9438c9aa9ed6b8993.
2021-11-22 09:03:14,904 ERROR org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT operation failed
java.io.IOException: Cannot allocate memory
    at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312]
    at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_312]
    at org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350) [flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110) [flink-dist_2.11-1.14.0.jar:1.14.0]

NOTE that still pvc has not become full. Also the next job submission would succeed without complaining about the storage being full. But due to random failures eventually the old blobs will pile up in recovery folder making pvc full after which all jobs submissions will fail.

Request immediate help here folks. Any pointers to why this behavior. We are relying on HA to make sure each job runs fine. This is mission critical data.
Secondly what we would want to see – why stale data is not being cleared off ? is it a configuration that we have not done. Still this does not solve our data loss problem we experience due to intermittent job submission failures but will make sure our pvc is not unnecessary saturated beyond which all jobs fail.

Thankyou in advance.
-Jay
GEHC






RE: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

Posted by "Ghiya, Jay (GE Healthcare)" <Ja...@ge.com>.
Thanks for prompt response. Understood @David Morávek<ma...@gmail.com>. Will record cpu and mem usage from Kubernetes metrics Grafana dashboard  of job managers and task managers when this happens and share here. If there is anything abnormal then we can get the jvm metrics for each pod in terms of heap,non-heap,gc behaviour .

@R, Aromal (GE Healthcare, consultant)<ma...@ge.com> Can you please post cpu and mem usage of jms and tms from the Kubernetes dashboard when this issue happens?

Thanks
Jay

From: David Morávek <da...@gmail.com>
Sent: 02 December 2021 16:34
To: Ghiya, Jay (GE Healthcare) <Ja...@ge.com>
Cc: user@flink.apache.org; Nellimarla, Aswini (GE Healthcare) <As...@ge.com>; R, Aromal (GE Healthcare, consultant) <Ar...@ge.com>; Kumar, Vipin (GE Healthcare) <vi...@ge.com>; Maniyan, Pramod (GE Healthcare, consultant) <Pr...@ge.com>
Subject: EXT: Re: Issues in Batch Jobs Submission for a Session Cluster

WARNING: This email originated from outside of GE. Please validate the sender's email address before clicking on links or attachments as they may not be safe.
Hi Jay,

It's hard to say what going on here. My best guess is that you're running out of memory for your process (eg. hitting ulimit). Can you please start with checking the ulimits memory usage of your container?

For the cleanup, right now it may happen in some failover scenarios that we don't cleanup some part's of the HA services (eg. blob store). There is FLIP-194 that addresses this limitation.

At first sight this sounds bit like a native memory leak, but these are in general really tricky to debug. Let's start with simply getting some stats about the actual memory usage.

Best,
D.


On Thu, Dec 2, 2021 at 7:49 AM Ghiya, Jay (GE Healthcare) <Ja...@ge.com>> wrote:
Hi Flink Team,

Greetings from GE Healthcare team.

Here is a stackoverflow post for the same too posted by fellow dev here : https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory

Summary of the post:

Here is the usecase and relevant configuration:

  1.  A flink session cluster in Kubernetes is being utilized to submit batch jobs every 1minute. Run time for a batch job is <30 seconds.
  2.  This Flink session cluster is running in HA setup. This means it stores job graph and its relevant metadata in flink “ /recovery/default/blob/,” folder for each job that is submitted.
  3.  There is “5 Gb” pvc attached to this session cluster for HA and is based out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html . Rook is used for orchestration.

Ideal Working Scenario:

  1.  Upon a successful job submission, the metadata is created and cleared after completion. Average size of blob getting created under recovery folder for ha is 150 mb. (Enough space in pvc)

Failure:

  1.  During a long run say for 100 minutes so 100 job submissions , a flink job submission will fail stating :


2021-11-22 09:03:11,537 INFO  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished cleaning up the high availability data for job 6a71a36a3c82d8a9438c9aa9ed6b8993.
2021-11-22 09:03:14,904 ERROR org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT operation failed
java.io.IOException: Cannot allocate memory
    at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312]
    at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_312]
    at org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350) [flink-dist_2.11-1.14.0.jar:1.14.0]
    at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110) [flink-dist_2.11-1.14.0.jar:1.14.0]

NOTE that still pvc has not become full. Also the next job submission would succeed without complaining about the storage being full. But due to random failures eventually the old blobs will pile up in recovery folder making pvc full after which all jobs submissions will fail.

Request immediate help here folks. Any pointers to why this behavior. We are relying on HA to make sure each job runs fine. This is mission critical data.
Secondly what we would want to see – why stale data is not being cleared off ? is it a configuration that we have not done. Still this does not solve our data loss problem we experience due to intermittent job submission failures but will make sure our pvc is not unnecessary saturated beyond which all jobs fail.

Thankyou in advance.
-Jay
GEHC






Re: Issues in Batch Jobs Submission for a Session Cluster

Posted by David Morávek <da...@gmail.com>.
Hi Jay,

It's hard to say what going on here. My best guess is that you're running
out of memory for your process (eg. hitting ulimit). Can you please start
with checking the ulimits memory usage of your container?

For the cleanup, right now it may happen in some failover scenarios that we
don't cleanup some part's of the HA services (eg. blob store). There is
FLIP-194 that addresses this limitation.

At first sight this sounds bit like a native memory leak, but these are in
general really tricky to debug. Let's start with simply getting some stats
about the actual memory usage.

Best,
D.


On Thu, Dec 2, 2021 at 7:49 AM Ghiya, Jay (GE Healthcare) <Ja...@ge.com>
wrote:

> Hi Flink Team,
>
>
>
> Greetings from GE Healthcare team.
>
>
>
> Here is a stackoverflow post for the same too posted by fellow dev here :
> https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory
>
>
>
> Summary of the post:
>
>
>
> Here is the usecase and relevant configuration:
>
>    1. A flink session cluster in Kubernetes is being utilized to submit
>    batch jobs every 1minute. Run time for a batch job is <30 seconds.
>    2. This Flink session cluster is running in HA setup. This means it
>    stores job graph and its relevant metadata in flink “
>     /recovery/default/blob/,” folder for each job that is submitted.
>    3. There is “5 Gb” pvc attached to this session cluster for HA and is
>    based out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html
>    . Rook is used for orchestration.
>
>
>
> Ideal Working Scenario:
>
>    1. Upon a successful job submission, the metadata is created and
>    cleared after completion. Average size of blob getting created under
>    recovery folder for ha is 150 mb. (Enough space in pvc)
>
>
>
> Failure:
>
>    1. During a long run say for 100 minutes so 100 job submissions , a
>    flink job submission will fail stating :
>
>
>
> 2021-11-22 09:03:11,537 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> 6a71a36a3c82d8a9438c9aa9ed6b8993.
>
> 2021-11-22 09:03:14,904 ERROR
> org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT
> operation failed
>
> java.io.IOException: Cannot allocate memory
>
>     at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312]
>
>     at java.io.FileOutputStream.write(FileOutputStream.java:326)
> ~[?:1.8.0_312]
>
>     at
> org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350)
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>
>     at
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>
>
>
> NOTE that still pvc has not become full. Also the next job submission
> would succeed without complaining about the storage being full. But due to
> random failures eventually the old blobs will pile up in recovery folder
> making pvc full after which all jobs submissions will fail.
>
>
>
> Request immediate help here folks. Any pointers to why this behavior. We
> are relying on HA to make sure each job runs fine. This is mission critical
> data.
>
> Secondly what we would want to see – why stale data is not being cleared
> off ? is it a configuration that we have not done. Still this does not
> solve our data loss problem we experience due to intermittent job
> submission failures but will make sure our pvc is not unnecessary saturated
> beyond which all jobs fail.
>
>
>
> Thankyou in advance.
>
> -Jay
>
> GEHC
>
>
>
>
>
>
>