You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tan Kim (Jira)" <ji...@apache.org> on 2023/04/30 09:23:00 UTC
[jira] [Updated] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary
[ https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tan Kim updated FLINK-31977:
----------------------------
Description:
The code below is a function to detect inefficient scaleups.
It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED (scaling.effectiveness.detection.enabled) is true after all the necessary computations for detection, but this is an unnecessary computation.
{code:java}
JobVertexScaler.java #175
private boolean detectIneffectiveScaleUp(
AbstractFlinkResource<?, ?> resource,
JobVertexID vertex,
Configuration conf,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
ScalingSummary lastSummary) {
double lastProcRate = lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 22569.315633422066
double lastExpectedProcRate =
lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 37340.0
var currentProcRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
// To judge the effectiveness of the scale up operation we compute how much of the expected
// increase actually happened. For example if we expect a 100 increase in proc rate and only
// got an increase of 10 we only accomplished 10% of the desired increase. If this number is
// below the threshold, we mark the scaling ineffective.
double expectedIncrease = lastExpectedProcRate - lastProcRate;
double actualIncrease = currentProcRate - lastProcRate;
boolean withinEffectiveThreshold =
(actualIncrease / expectedIncrease)
>= conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
if (withinEffectiveThreshold) {
return false;
}
var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
EventRecorder.Reason.IneffectiveScaling,
EventRecorder.Component.Operator,
message);
if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
LOG.info(message);
return true;
} else {
return false;
}
} {code}
It's better to check SCALING_EFFECTIVENESS_DETECTION_ENABLED beforehand and then call the function, as shown in the if statement in the code below, which is the caller of this function.
{code:java}
JobVertexScaler.java #150
if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) {
if (scaledUp) {
if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
return detectIneffectiveScaleUp(resource, vertex, conf, evaluatedMetrics, lastSummary);
} else {
return true;
}
} else {
return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs);
}
}{code}
was:
The code below is a function to detect inefficient scaleups.
It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED (scaling.effectiveness.detection.enabled) is true after all the necessary computations for detection, but this is an unnecessary computation.
{code:java}
JobVertexScaler.java #175
private boolean detectIneffectiveScaleUp(
AbstractFlinkResource<?, ?> resource,
JobVertexID vertex,
Configuration conf,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
ScalingSummary lastSummary) {
double lastProcRate = lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 22569.315633422066
double lastExpectedProcRate =
lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 37340.0
var currentProcRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
// To judge the effectiveness of the scale up operation we compute how much of the expected
// increase actually happened. For example if we expect a 100 increase in proc rate and only
// got an increase of 10 we only accomplished 10% of the desired increase. If this number is
// below the threshold, we mark the scaling ineffective.
double expectedIncrease = lastExpectedProcRate - lastProcRate;
double actualIncrease = currentProcRate - lastProcRate;
boolean withinEffectiveThreshold =
(actualIncrease / expectedIncrease)
>= conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
if (withinEffectiveThreshold) {
return false;
}
var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
EventRecorder.Reason.IneffectiveScaling,
EventRecorder.Component.Operator,
message);
if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
LOG.info(message);
return true;
} else {
return false;
}
} {code}
It's better to check SCALING_EFFECTIVENESS_DETECTION_ENABLED beforehand and then call the function, as shown in the if statement in the code below, which is the caller of this function.
{code:java}
JobVertexScaler.java #150
if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) {
if (scaledUp) {
if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
detectIneffectiveScaleUp(resource, vertex, conf, evaluatedMetrics, lastSummary);
} else {
return true;
}
} else {
return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs);
}
}{code}
> If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-31977
> URL: https://issues.apache.org/jira/browse/FLINK-31977
> Project: Flink
> Issue Type: Improvement
> Components: Autoscaler
> Affects Versions: 1.17.0
> Reporter: Tan Kim
> Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED (scaling.effectiveness.detection.enabled) is true after all the necessary computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
> AbstractFlinkResource<?, ?> resource,
> JobVertexID vertex,
> Configuration conf,
> Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
> ScalingSummary lastSummary) {
> double lastProcRate = lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 22569.315633422066
> double lastExpectedProcRate =
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 37340.0
> var currentProcRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
> // To judge the effectiveness of the scale up operation we compute how much of the expected
> // increase actually happened. For example if we expect a 100 increase in proc rate and only
> // got an increase of 10 we only accomplished 10% of the desired increase. If this number is
> // below the threshold, we mark the scaling ineffective.
> double expectedIncrease = lastExpectedProcRate - lastProcRate;
> double actualIncrease = currentProcRate - lastProcRate;
> boolean withinEffectiveThreshold =
> (actualIncrease / expectedIncrease)
> >= conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
> if (withinEffectiveThreshold) {
> return false;
> }
> var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
> eventRecorder.triggerEvent(
> resource,
> EventRecorder.Type.Normal,
> EventRecorder.Reason.IneffectiveScaling,
> EventRecorder.Component.Operator,
> message);
> if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> LOG.info(message);
> return true;
> } else {
> return false;
> }
> } {code}
> It's better to check SCALING_EFFECTIVENESS_DETECTION_ENABLED beforehand and then call the function, as shown in the if statement in the code below, which is the caller of this function.
> {code:java}
> JobVertexScaler.java #150
> if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) {
> if (scaledUp) {
> if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> return detectIneffectiveScaleUp(resource, vertex, conf, evaluatedMetrics, lastSummary);
> } else {
> return true;
> }
> } else {
> return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs);
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)