You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexey Trenikhun <ye...@msn.com> on 2022/02/14 21:16:50 UTC

TM OOMKilled

Hello,
We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *), once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) but I don't see failed save points, so I assume OOM happens right after savepoint taken. However OOMKilled doesn't happen on every save point, so maybe this is a random correlation.
I've reserved 2G for JVM overhead, but somehow it is not enough ? Any known issues with memory and savepoints? Any suggestions how to troubleshoot this?

 Final TaskExecutor Memory configuration:
   Total Process Memory:          10.000gb (10737418240 bytes)
     Total Flink Memory:          7.547gb (8103395328 bytes)
       Total JVM Heap Memory:     3.523gb (3783262149 bytes)
         Framework:               128.000mb (134217728 bytes)
         Task:                    3.398gb (3649044421 bytes)
       Total Off-heap Memory:     4.023gb (4320133179 bytes)
         Managed:                 3.019gb (3241358179 bytes)
         Total JVM Direct Memory: 1.005gb (1078775000 bytes)
           Framework:             128.000mb (134217728 bytes)
           Task:                  128.000mb (134217728 bytes)
           Network:               772.800mb (810339544 bytes)
     JVM Metaspace:               256.000mb (268435456 bytes)
     JVM Overhead:                2.203gb (2365587456 bytes)

Thanks,
Alexey

Fraud detection demo with Flink 1.14

Posted by Pramit Vamsi <pr...@gmail.com>.
Hi,

Problem: Watermark does not move within Dynamic Alert Function

Implementing ideas (as is) from this article -
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
Code: https://github.com/afedulov/fraud-detection-demo
Pipeline: Kafka -> Dynamic Key Function -> Dynamic Alert Function -> Kafka
sink

Adapted code for Flink 1.14.3:

1. Init transaction source:
----------------------------------------------
KafkaSource<String> transactionSource =
TransactionsSource.createTransactionsSource(config);
int sourceParallelism = config.get(SOURCE_PARALLELISM);
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofMillis(config.get(OUT_OF_ORDERNESS)))
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis());

DataStream<String> transactionsStringsStream = env
.fromSource(transactionSource, watermarkStrategy, "KafkaTransactions")
.name("Transactions Source").setParallelism(sourceParallelism);
DataStream<Transaction> transactionsStream = TransactionsSource
.stringsStreamToTransactions(transactionsStringsStream);

return transactionsStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofMillis(config.get(OUT_OF_ORDERNESS)))
.withTimestampAssigner((event, timestamp) -> event.getEventTime()));

DataStream<Rule> rulesUpdateStream = getRulesUpdateStream(env);
BroadcastStream<Rule> rulesStream =
rulesUpdateStream.broadcast(Descriptors.rulesDescriptor);

// Processing pipeline setup
DataStream<Alert> alerts = transactions.connect(rulesStream).process(new
DynamicKeyFunction())
.uid("DynamicKeyFunction").name("Dynamic Partitioning
Function").keyBy((keyed) -> keyed.getKey())
.connect(rulesStream).process(new
DynamicAlertFunction()).uid("DynamicAlertFunction")
.name("Dynamic Rule Evaluation Function");

private DataStream<Rule> getRulesUpdateStream(StreamExecutionEnvironment
env) throws IOException {

RulesSource.Type rulesSourceEnumType = getRulesSourceType();

KafkaSource<String> rulesSource = RulesSource.createRulesSource(config);
DataStream<String> rulesStrings = env
.fromSource(rulesSource, WatermarkStrategy.noWatermarks(), "KafkaRules")
.name(rulesSourceEnumType.getName()).setParallelism(1);
return RulesSource.stringsStreamToRules(rulesStrings);
}
----------------------------------------------
Watermark stays at -9223372036854775808. DynamicAlertFunction onTimer()
does not fire. In Web UI, I see "No Watermark (Watermarks are only
available if EventTime is used)"

Please help.

Thanks

Re: TM OOMKilled

Posted by Alexey Trenikhun <ye...@msn.com>.
From attached screenshot (18h) looks like memory leak to me

Thanks,
Alexey
________________________________
From: Xintong Song <to...@gmail.com>
Sent: Tuesday, February 15, 2022 7:20 PM
To: Alexey Trenikhun <ye...@msn.com>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: TM OOMKilled

Thanks Alexey,

In my experience, common causes for TM OOMKill are:
1. RocksDB uses more memory than expected. Unfortunately, the memory hard limit is not supported by RocksDB. Flink conservatively estimates RocksDB's memory footprint and tunes its parameters accordingly, which is not 100% safe.
2. The job (connectors, udfs, and their dependencies) may need direct and native memory. When native memory is needed, increasing task off-heap memory may not be as helpful as increasing the jvm overhead.
3. There could also be memory leaks, leading to continuously increasing memory footprint. Based on your description that the OOM happens about every 2days, this is highly suspected.

For 1 & 2, increase jvm overhead would help. For 3, you many need to investigate the heap/thread dump to find out where the leak come from.

I'd suggest to first increase the jvm overhead see if it fix the problem. If the problem is not fixed, but the job runs longer before the OOM happens, then it's likely the 3rd case. Moreover, you can monitor the pod memory footprint changes if such metrics are available.


Thank you~

Xintong Song


On Tue, Feb 15, 2022 at 11:56 PM Alexey Trenikhun <ye...@msn.com>> wrote:
Hi Xintong,
I've checked - `state.backend.rocksdb.memory.managed` is not explicitly configured, so as you wrote it should be true by default.

Regarding task off-heap, I believe KafkaConsumer needed off-heap memory some time ago

________________________________
From: Xintong Song <to...@gmail.com>>
Sent: Monday, February 14, 2022 10:06 PM
To: Alexey Trenikhun <ye...@msn.com>>
Cc: Flink User Mail List <us...@flink.apache.org>>
Subject: Re: TM OOMKilled

Hi Alexey,

You may want to double check if `state.backend.rocksdb.memory.managed` is configured to `true`. (This should be `true` by default.)

Another question that may or may not be related. I noticed that you have configured 128MB task off-heap memory, which IIRC the default should be 0. Could you share what that is for?


Thank you~

Xintong Song


On Tue, Feb 15, 2022 at 12:10 PM Alexey Trenikhun <ye...@msn.com>> wrote:
Hello,
We use RocksDB, but there is no problem with Java heap, which is limited by 3.523gb, the problem with total container memory. The pod is killed not due OutOfMemoryError,  but because total container memory exceeds 10gb

Thanks,
Alexey
________________________________
From: Caizhi Weng <ts...@gmail.com>>
Sent: Monday, February 14, 2022 6:42:05 PM
To: Alexey Trenikhun <ye...@msn.com>>
Cc: Flink User Mail List <us...@flink.apache.org>>
Subject: Re: TM OOMKilled

Hi!

Heap memory usage depends heavily on your job and your state backend. Which state backend are you using and if possible could you share your user code or explain what operations your job is doing?

Alexey Trenikhun <ye...@msn.com>> 于2022年2月15日周二 05:17写道:
Hello,
We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *), once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) but I don't see failed save points, so I assume OOM happens right after savepoint taken. However OOMKilled doesn't happen on every save point, so maybe this is a random correlation.
I've reserved 2G for JVM overhead, but somehow it is not enough ? Any known issues with memory and savepoints? Any suggestions how to troubleshoot this?

 Final TaskExecutor Memory configuration:
   Total Process Memory:          10.000gb (10737418240 bytes)
     Total Flink Memory:          7.547gb (8103395328 bytes)
       Total JVM Heap Memory:     3.523gb (3783262149 bytes)
         Framework:               128.000mb (134217728 bytes)
         Task:                    3.398gb (3649044421 bytes)
       Total Off-heap Memory:     4.023gb (4320133179 bytes)
         Managed:                 3.019gb (3241358179 bytes)
         Total JVM Direct Memory: 1.005gb (1078775000 bytes)
           Framework:             128.000mb (134217728 bytes)
           Task:                  128.000mb (134217728 bytes)
           Network:               772.800mb (810339544 bytes)
     JVM Metaspace:               256.000mb (268435456 bytes)
     JVM Overhead:                2.203gb (2365587456 bytes)

Thanks,
Alexey

Re: TM OOMKilled

Posted by Xintong Song <to...@gmail.com>.
Thanks Alexey,

In my experience, common causes for TM OOMKill are:
1. RocksDB uses more memory than expected. Unfortunately, the memory hard
limit is not supported by RocksDB. Flink conservatively estimates RocksDB's
memory footprint and tunes its parameters accordingly, which is not 100%
safe.
2. The job (connectors, udfs, and their dependencies) may need direct and
native memory. When native memory is needed, increasing task off-heap
memory may not be as helpful as increasing the jvm overhead.
3. There could also be memory leaks, leading to continuously increasing
memory footprint. Based on your description that the OOM happens about
every 2days, this is highly suspected.

For 1 & 2, increase jvm overhead would help. For 3, you many need to
investigate the heap/thread dump to find out where the leak come from.

I'd suggest to first increase the jvm overhead see if it fix the problem.
If the problem is not fixed, but the job runs longer before the OOM
happens, then it's likely the 3rd case. Moreover, you can monitor the pod
memory footprint changes if such metrics are available.

Thank you~

Xintong Song



On Tue, Feb 15, 2022 at 11:56 PM Alexey Trenikhun <ye...@msn.com> wrote:

> Hi Xintong,
> I've checked - `state.backend.rocksdb.memory.managed` is not explicitly
> configured, so as you wrote it should be true by default.
>
> Regarding task off-heap, I believe KafkaConsumer needed off-heap memory
> some time ago
>
> ------------------------------
> *From:* Xintong Song <to...@gmail.com>
> *Sent:* Monday, February 14, 2022 10:06 PM
> *To:* Alexey Trenikhun <ye...@msn.com>
> *Cc:* Flink User Mail List <us...@flink.apache.org>
> *Subject:* Re: TM OOMKilled
>
> Hi Alexey,
>
> You may want to double check if `state.backend.rocksdb.memory.managed` is
> configured to `true`. (This should be `true` by default.)
>
> Another question that may or may not be related. I noticed that you have
> configured 128MB task off-heap memory, which IIRC the default should be 0.
> Could you share what that is for?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Feb 15, 2022 at 12:10 PM Alexey Trenikhun <ye...@msn.com> wrote:
>
> Hello,
> We use RocksDB, but there is no problem with Java heap, which is limited
> by 3.523gb, the problem with total container memory. The pod is killed
> not due OutOfMemoryError,  but because total container memory exceeds 10gb
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Caizhi Weng <ts...@gmail.com>
> *Sent:* Monday, February 14, 2022 6:42:05 PM
> *To:* Alexey Trenikhun <ye...@msn.com>
> *Cc:* Flink User Mail List <us...@flink.apache.org>
> *Subject:* Re: TM OOMKilled
>
> Hi!
>
> Heap memory usage depends heavily on your job and your state backend.
> Which state backend are you using and if possible could you share your user
> code or explain what operations your job is doing?
>
> Alexey Trenikhun <ye...@msn.com> 于2022年2月15日周二 05:17写道:
>
> Hello,
> We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also
> have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *),
> once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on
> even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) but I don't
> see failed save points, so I assume OOM happens right after savepoint
> taken. However OOMKilled doesn't happen on every save point, so maybe this
> is a random correlation.
> I've reserved 2G for JVM overhead, but somehow it is not enough ? Any
> known issues with memory and savepoints? Any suggestions how to
> troubleshoot this?
>
>  Final TaskExecutor Memory configuration:
>    Total Process Memory:          10.000gb (10737418240 bytes)
>      Total Flink Memory:          7.547gb (8103395328 bytes)
>        Total JVM Heap Memory:     3.523gb (3783262149 bytes)
>          Framework:               128.000mb (134217728 bytes)
>          Task:                    3.398gb (3649044421 bytes)
>        Total Off-heap Memory:     4.023gb (4320133179 bytes)
>          Managed:                 3.019gb (3241358179 bytes)
>          Total JVM Direct Memory: 1.005gb (1078775000 bytes)
>            Framework:             128.000mb (134217728 bytes)
>            Task:                  128.000mb (134217728 bytes)
>            Network:               772.800mb (810339544 bytes)
>      JVM Metaspace:               256.000mb (268435456 bytes)
>      JVM Overhead:                2.203gb (2365587456 bytes)
>
> Thanks,
> Alexey
>
>

Re: TM OOMKilled

Posted by Alexey Trenikhun <ye...@msn.com>.
Hi Xintong,
I've checked - `state.backend.rocksdb.memory.managed` is not explicitly configured, so as you wrote it should be true by default.

Regarding task off-heap, I believe KafkaConsumer needed off-heap memory some time ago

________________________________
From: Xintong Song <to...@gmail.com>
Sent: Monday, February 14, 2022 10:06 PM
To: Alexey Trenikhun <ye...@msn.com>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: TM OOMKilled

Hi Alexey,

You may want to double check if `state.backend.rocksdb.memory.managed` is configured to `true`. (This should be `true` by default.)

Another question that may or may not be related. I noticed that you have configured 128MB task off-heap memory, which IIRC the default should be 0. Could you share what that is for?


Thank you~

Xintong Song


On Tue, Feb 15, 2022 at 12:10 PM Alexey Trenikhun <ye...@msn.com>> wrote:
Hello,
We use RocksDB, but there is no problem with Java heap, which is limited by 3.523gb, the problem with total container memory. The pod is killed not due OutOfMemoryError,  but because total container memory exceeds 10gb

Thanks,
Alexey
________________________________
From: Caizhi Weng <ts...@gmail.com>>
Sent: Monday, February 14, 2022 6:42:05 PM
To: Alexey Trenikhun <ye...@msn.com>>
Cc: Flink User Mail List <us...@flink.apache.org>>
Subject: Re: TM OOMKilled

Hi!

Heap memory usage depends heavily on your job and your state backend. Which state backend are you using and if possible could you share your user code or explain what operations your job is doing?

Alexey Trenikhun <ye...@msn.com>> 于2022年2月15日周二 05:17写道:
Hello,
We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *), once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) but I don't see failed save points, so I assume OOM happens right after savepoint taken. However OOMKilled doesn't happen on every save point, so maybe this is a random correlation.
I've reserved 2G for JVM overhead, but somehow it is not enough ? Any known issues with memory and savepoints? Any suggestions how to troubleshoot this?

 Final TaskExecutor Memory configuration:
   Total Process Memory:          10.000gb (10737418240 bytes)
     Total Flink Memory:          7.547gb (8103395328 bytes)
       Total JVM Heap Memory:     3.523gb (3783262149 bytes)
         Framework:               128.000mb (134217728 bytes)
         Task:                    3.398gb (3649044421 bytes)
       Total Off-heap Memory:     4.023gb (4320133179 bytes)
         Managed:                 3.019gb (3241358179 bytes)
         Total JVM Direct Memory: 1.005gb (1078775000 bytes)
           Framework:             128.000mb (134217728 bytes)
           Task:                  128.000mb (134217728 bytes)
           Network:               772.800mb (810339544 bytes)
     JVM Metaspace:               256.000mb (268435456 bytes)
     JVM Overhead:                2.203gb (2365587456 bytes)

Thanks,
Alexey

Re: TM OOMKilled

Posted by Xintong Song <to...@gmail.com>.
Hi Alexey,

You may want to double check if `state.backend.rocksdb.memory.managed` is
configured to `true`. (This should be `true` by default.)

Another question that may or may not be related. I noticed that you have
configured 128MB task off-heap memory, which IIRC the default should be 0.
Could you share what that is for?

Thank you~

Xintong Song



On Tue, Feb 15, 2022 at 12:10 PM Alexey Trenikhun <ye...@msn.com> wrote:

> Hello,
> We use RocksDB, but there is no problem with Java heap, which is limited
> by 3.523gb, the problem with total container memory. The pod is killed
> not due OutOfMemoryError,  but because total container memory exceeds 10gb
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Caizhi Weng <ts...@gmail.com>
> *Sent:* Monday, February 14, 2022 6:42:05 PM
> *To:* Alexey Trenikhun <ye...@msn.com>
> *Cc:* Flink User Mail List <us...@flink.apache.org>
> *Subject:* Re: TM OOMKilled
>
> Hi!
>
> Heap memory usage depends heavily on your job and your state backend.
> Which state backend are you using and if possible could you share your user
> code or explain what operations your job is doing?
>
> Alexey Trenikhun <ye...@msn.com> 于2022年2月15日周二 05:17写道:
>
> Hello,
> We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also
> have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *),
> once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on
> even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) but I don't
> see failed save points, so I assume OOM happens right after savepoint
> taken. However OOMKilled doesn't happen on every save point, so maybe this
> is a random correlation.
> I've reserved 2G for JVM overhead, but somehow it is not enough ? Any
> known issues with memory and savepoints? Any suggestions how to
> troubleshoot this?
>
>  Final TaskExecutor Memory configuration:
>    Total Process Memory:          10.000gb (10737418240 bytes)
>      Total Flink Memory:          7.547gb (8103395328 bytes)
>        Total JVM Heap Memory:     3.523gb (3783262149 bytes)
>          Framework:               128.000mb (134217728 bytes)
>          Task:                    3.398gb (3649044421 bytes)
>        Total Off-heap Memory:     4.023gb (4320133179 bytes)
>          Managed:                 3.019gb (3241358179 bytes)
>          Total JVM Direct Memory: 1.005gb (1078775000 bytes)
>            Framework:             128.000mb (134217728 bytes)
>            Task:                  128.000mb (134217728 bytes)
>            Network:               772.800mb (810339544 bytes)
>      JVM Metaspace:               256.000mb (268435456 bytes)
>      JVM Overhead:                2.203gb (2365587456 bytes)
>
> Thanks,
> Alexey
>
>

Re: TM OOMKilled

Posted by Alexey Trenikhun <ye...@msn.com>.
Hello,
We use RocksDB, but there is no problem with Java heap, which is limited by 3.523gb, the problem with total container memory. The pod is killed not due OutOfMemoryError,  but because total container memory exceeds 10gb

Thanks,
Alexey
________________________________
From: Caizhi Weng <ts...@gmail.com>
Sent: Monday, February 14, 2022 6:42:05 PM
To: Alexey Trenikhun <ye...@msn.com>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: TM OOMKilled

Hi!

Heap memory usage depends heavily on your job and your state backend. Which state backend are you using and if possible could you share your user code or explain what operations your job is doing?

Alexey Trenikhun <ye...@msn.com>> 于2022年2月15日周二 05:17写道:
Hello,
We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *), once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) but I don't see failed save points, so I assume OOM happens right after savepoint taken. However OOMKilled doesn't happen on every save point, so maybe this is a random correlation.
I've reserved 2G for JVM overhead, but somehow it is not enough ? Any known issues with memory and savepoints? Any suggestions how to troubleshoot this?

 Final TaskExecutor Memory configuration:
   Total Process Memory:          10.000gb (10737418240 bytes)
     Total Flink Memory:          7.547gb (8103395328 bytes)
       Total JVM Heap Memory:     3.523gb (3783262149 bytes)
         Framework:               128.000mb (134217728 bytes)
         Task:                    3.398gb (3649044421 bytes)
       Total Off-heap Memory:     4.023gb (4320133179 bytes)
         Managed:                 3.019gb (3241358179 bytes)
         Total JVM Direct Memory: 1.005gb (1078775000 bytes)
           Framework:             128.000mb (134217728 bytes)
           Task:                  128.000mb (134217728 bytes)
           Network:               772.800mb (810339544 bytes)
     JVM Metaspace:               256.000mb (268435456 bytes)
     JVM Overhead:                2.203gb (2365587456 bytes)

Thanks,
Alexey

Re: TM OOMKilled

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Heap memory usage depends heavily on your job and your state backend. Which
state backend are you using and if possible could you share your user code
or explain what operations your job is doing?

Alexey Trenikhun <ye...@msn.com> 于2022年2月15日周二 05:17写道:

> Hello,
> We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also
> have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *),
> once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on
> even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18) but I don't
> see failed save points, so I assume OOM happens right after savepoint
> taken. However OOMKilled doesn't happen on every save point, so maybe this
> is a random correlation.
> I've reserved 2G for JVM overhead, but somehow it is not enough ? Any
> known issues with memory and savepoints? Any suggestions how to
> troubleshoot this?
>
>  Final TaskExecutor Memory configuration:
>    Total Process Memory:          10.000gb (10737418240 bytes)
>      Total Flink Memory:          7.547gb (8103395328 bytes)
>        Total JVM Heap Memory:     3.523gb (3783262149 bytes)
>          Framework:               128.000mb (134217728 bytes)
>          Task:                    3.398gb (3649044421 bytes)
>        Total Off-heap Memory:     4.023gb (4320133179 bytes)
>          Managed:                 3.019gb (3241358179 bytes)
>          Total JVM Direct Memory: 1.005gb (1078775000 bytes)
>            Framework:             128.000mb (134217728 bytes)
>            Task:                  128.000mb (134217728 bytes)
>            Network:               772.800mb (810339544 bytes)
>      JVM Metaspace:               256.000mb (268435456 bytes)
>      JVM Overhead:                2.203gb (2365587456 bytes)
>
> Thanks,
> Alexey
>