You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Gyula Fora (Jira)" <ji...@apache.org> on 2023/03/05 18:27:00 UTC
[jira] [Commented] (FLINK-31326) Disabled source scaling breaks downstream scaling if source busyTimeMsPerSecond is 0
[ https://issues.apache.org/jira/browse/FLINK-31326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17696603#comment-17696603 ]
Gyula Fora commented on FLINK-31326:
------------------------------------
Good catch [~mateczagany] , there are a few oddities around source metrics at the moment when load / incoming data rate is low. After we merge [~mxm] 's work on the pending record / zero scaling improvements, we should revisit this config and simplify the code for the source metrics if possible.
> Disabled source scaling breaks downstream scaling if source busyTimeMsPerSecond is 0
> ------------------------------------------------------------------------------------
>
> Key: FLINK-31326
> URL: https://issues.apache.org/jira/browse/FLINK-31326
> Project: Flink
> Issue Type: Bug
> Components: Autoscaler, Kubernetes Operator
> Affects Versions: kubernetes-operator-1.5.0
> Reporter: Mate Czagany
> Priority: Major
>
> In case of 'scaling.sources.enabled'='false' the 'TARGET_DATA_RATE' of the source vertex will be calculated as '(1000 / busyTimeMsPerSecond) * numRecordsOutPerSecond' which currently on the main branch results in an infinite value if 'busyTimeMsPerSecond' is 0. This will also affect downstream operators.
> I'm not that familiar with the autoscaler code, but it's in my opinion it's quite unexpected to have such behavioral changes by setting 'scaling.sources.enabled' to false.
>
> With PR #543 for FLINK-30575 (https://github.com/apache/flink-kubernetes-operator/pull/543) scaling will happen even with 'busyTimeMsPerSecond' being 0, but it will result in unreasonably high parallelism numbers for downstream operators because 'TARGET_DATA_RATE' will be very high where 0 'busyTimeMsPerSecond' will be replaced with 1e-10.
> Metrics from the operator logs (source=e5a72f353fc1e6bbf3bd96a41384998c, sink=51312116a3e504bccb3874fc80d5055e)
> 'scaling.sources.enabled'='true':
> {code:java}
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.PARALLELISM.Current: 1.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.MAX_PARALLELISM.Current: 1.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Current: NaN
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Average: NaN
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.CATCH_UP_DATA_RATE.Current: 0.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_UP_RATE_THRESHOLD.Current: 5.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_DOWN_RATE_THRESHOLD.Current: 10.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Current: 2.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Average: 2.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Current: Infinity
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Average: NaN
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Current: 3.8666666666666667
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Average: 3.8833333333333333
> jobVertexID.51312116a3e504bccb3874fc80d5055e.PARALLELISM.Current: 4.0
> jobVertexID.51312116a3e504bccb3874fc80d5055e.MAX_PARALLELISM.Current: 12.0
> jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Current: 4.827299209321681
> jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Average: 4.848351269098938
> jobVertexID.51312116a3e504bccb3874fc80d5055e.CATCH_UP_DATA_RATE.Current: 0.0
> jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_UP_RATE_THRESHOLD.Current: 10.0
> jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_DOWN_RATE_THRESHOLD.Current: 21.0
> jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Current: 7.733333333333333
> jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Average: 7.766666666666667{code}
> 'scaling.sources.enabled'='false':
> {code:java}
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.PARALLELISM.Current: 1.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.MAX_PARALLELISM.Current: 1.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Current: NaN
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Average: NaN
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.CATCH_UP_DATA_RATE.Current: 0.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_UP_RATE_THRESHOLD.Current: NaN
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_DOWN_RATE_THRESHOLD.Current: NaN
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Current: 2.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Average: 2.0
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Current: Infinity
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Average: NaN
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Current: Infinity
> jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Average: NaN
> jobVertexID.51312116a3e504bccb3874fc80d5055e.PARALLELISM.Current: 4.0
> jobVertexID.51312116a3e504bccb3874fc80d5055e.MAX_PARALLELISM.Current: 12.0
> jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Current: 5.0
> jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Average: 4.980555555555556
> jobVertexID.51312116a3e504bccb3874fc80d5055e.CATCH_UP_DATA_RATE.Current: 0.0
> jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_UP_RATE_THRESHOLD.Current: NaN
> jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_DOWN_RATE_THRESHOLD.Current: NaN
> jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Current: Infinity
> jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Average: NaN{code}
>
> My guess is 'scaling.sources.enabled' exists to support connectors where `pendingRecords` is not available, but setting this to false also negatively impacts existing Kafka sources for example, and users cannot anticipate this from the documentation.
>
> I think it would be worth it to include this in the docs, or if anyone has any suggested solutions I would gladly look into implementing it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)