You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "gyfora (via GitHub)" <gi...@apache.org> on 2023/04/20 10:33:49 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #571: [FLINK-31827] Compute output ratio per edge

gyfora opened a new pull request, #571:
URL: https://github.com/apache/flink-kubernetes-operator/pull/571

   ## What is the purpose of the change
   
   This change introduces per-edge output ratio tracking for the JobGrapgh instead of the previous per-vertex tracking.
   The previous approach assumed that all downstream vertices will receive the same output (all edges have the same num records out) which is not true when side output or more complex operator chains are present in the JobVertex.
   
   While Flink does not directly expose per edge for the sent/received records, in many cases this can be computed.
    - If we have a single output then we use num records out
    - If the downstream vertex has a single input we use the downstream num records in
    - If the downstream vertex has only inputs with a single output then we subtrack the upstream numRecrods out from other inputs from the num records in
    
    As a related change we should also introduce per edge record count metrics in Flink which would allow us to use that in the autoscaler algorithm if enabled. 
   
   ## Brief change log
   
     - *Remove per-vertex OUTPUT_RATIO and TRUE_OUTPUT_RATE metrics*
     - *Rework the collected metrics / metric history to allow storing per-edge output ratio metrics*
     - *Compute per-edge output ratio depending on the topology*
     - *Use downstream num records in whenever possible instead of upstream num records out as the latter is very unreliable due to some Flink bugs (https://issues.apache.org/jira/browse/FLINK-18808 & https://issues.apache.org/jira/browse/FLINK-31752)*
   
   ## Verifying this change
   
   New unit tests added for the output ratio computation and existing tests cover the current behaviour.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no
     - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #571: [FLINK-31827] Compute output ratio per edge

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora merged PR #571:
URL: https://github.com/apache/flink-kubernetes-operator/pull/571


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #571: [FLINK-31827] Compute output ratio per edge

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #571:
URL: https://github.com/apache/flink-kubernetes-operator/pull/571#discussion_r1172575638


##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java:
##########
@@ -86,20 +87,24 @@ public AutoScalerInfo(Map<String, String> data) {
         configMap.setData(Preconditions.checkNotNull(data));
     }
 
-    @SneakyThrows
-    public SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> getMetricHistory() {
+    public SortedMap<Instant, CollectedMetrics> getMetricHistory() {
         var historyYaml = configMap.getData().get(COLLECTED_METRICS_KEY);
         if (historyYaml == null) {
             return new TreeMap<>();
         }
 
-        return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
+        try {
+            return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
+        } catch (JsonProcessingException e) {
+            LOG.error(
+                    "Could not deserialize metric history, possibly the format changed. Discarding...");
+            return new TreeMap<>();

Review Comment:
   Actually that does not work without a fix that I'm working on right now. If you re-init, then the job update time can still be in the past which triggers filling the metric immediately. PR pending which addresses this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #571: [FLINK-31827] Compute output ratio per edge

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #571:
URL: https://github.com/apache/flink-kubernetes-operator/pull/571#discussion_r1172429181


##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java:
##########
@@ -153,37 +169,64 @@ private static double getNumRecordsInPerSecond(
     }
 
     private static double getNumRecordsOutPerSecond(
-            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
-            JobVertexID jobVertexID,
-            boolean isSource,
-            double numRecordsInPerSecond) {
-        if (numRecordsInPerSecond <= 0) {
-            // If the input rate is zero, we also need to flatten the output rate.
-            // Otherwise, the OUTPUT_RATIO would be outrageously large, leading to
-            // a rapid scale up.
-            return 0;
-        }
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics, JobVertexID jobVertexID) {
+
         AggregatedMetric numRecordsOutPerSecond =
                 flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+
         if (numRecordsOutPerSecond == null) {
-            if (isSource) {
-                numRecordsOutPerSecond =
-                        flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
-            }
-            if (numRecordsOutPerSecond == null) {
-                LOG.warn("Received null output rate for {}. Returning NaN.", jobVertexID);
-                return Double.NaN;
-            }
+            LOG.warn("Received null output rate for {}. Returning NaN.", jobVertexID);
+            return Double.NaN;
         }
         return numRecordsOutPerSecond.getSum();
     }
 
-    private static double computeOutputRatio(
-            double numRecordsInPerSecond, double numRecordsOutPerSecond) {
-        if (numRecordsInPerSecond <= 0) {
-            return 0;
+    private static double computeEdgeOutPerSecond(
+            JobTopology topology,
+            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
+            JobVertexID from,
+            JobVertexID to) {
+        var toMetrics = flinkMetrics.get(to);
+
+        var toVertexInputs = topology.getInputs().get(to);
+        // Case 1: Downstream vertex has a single input (from) so we can use the most reliable num
+        // records in
+        if (toVertexInputs.size() == 1) {
+            LOG.debug(
+                    "Computing edge ({}, {}) data rate for single input downstream task", from, to);
+            return getNumRecordsInPerSecond(toMetrics, to, false);
+        }
+
+        // Case 2: Downstream vertex has only inputs from upstream vertices which don't have other
+        // outputs
+        double sumOtherUpstreamInputs = 0;
+        for (JobVertexID input : toVertexInputs) {
+            if (input.equals(from)) {
+                continue;

Review Comment:
   ```suggestion
                   // Exclude source edge because we only want to consider other input edges
                   continue;
   ```



##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java:
##########
@@ -153,37 +169,64 @@ private static double getNumRecordsInPerSecond(
     }
 
     private static double getNumRecordsOutPerSecond(
-            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
-            JobVertexID jobVertexID,
-            boolean isSource,
-            double numRecordsInPerSecond) {
-        if (numRecordsInPerSecond <= 0) {
-            // If the input rate is zero, we also need to flatten the output rate.
-            // Otherwise, the OUTPUT_RATIO would be outrageously large, leading to
-            // a rapid scale up.
-            return 0;
-        }
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics, JobVertexID jobVertexID) {
+
         AggregatedMetric numRecordsOutPerSecond =
                 flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+
         if (numRecordsOutPerSecond == null) {
-            if (isSource) {
-                numRecordsOutPerSecond =
-                        flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
-            }
-            if (numRecordsOutPerSecond == null) {
-                LOG.warn("Received null output rate for {}. Returning NaN.", jobVertexID);
-                return Double.NaN;
-            }
+            LOG.warn("Received null output rate for {}. Returning NaN.", jobVertexID);
+            return Double.NaN;
         }
         return numRecordsOutPerSecond.getSum();
     }
 
-    private static double computeOutputRatio(
-            double numRecordsInPerSecond, double numRecordsOutPerSecond) {
-        if (numRecordsInPerSecond <= 0) {
-            return 0;
+    private static double computeEdgeOutPerSecond(
+            JobTopology topology,
+            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
+            JobVertexID from,
+            JobVertexID to) {
+        var toMetrics = flinkMetrics.get(to);
+
+        var toVertexInputs = topology.getInputs().get(to);
+        // Case 1: Downstream vertex has a single input (from) so we can use the most reliable num
+        // records in
+        if (toVertexInputs.size() == 1) {
+            LOG.debug(
+                    "Computing edge ({}, {}) data rate for single input downstream task", from, to);
+            return getNumRecordsInPerSecond(toMetrics, to, false);
+        }
+
+        // Case 2: Downstream vertex has only inputs from upstream vertices which don't have other
+        // outputs
+        double sumOtherUpstreamInputs = 0;

Review Comment:
   ```suggestion
           double numRecordsOutFromUpstreamInputs = 0;
   ```



##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java:
##########
@@ -86,20 +87,24 @@ public AutoScalerInfo(Map<String, String> data) {
         configMap.setData(Preconditions.checkNotNull(data));
     }
 
-    @SneakyThrows
-    public SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> getMetricHistory() {
+    public SortedMap<Instant, CollectedMetrics> getMetricHistory() {
         var historyYaml = configMap.getData().get(COLLECTED_METRICS_KEY);
         if (historyYaml == null) {
             return new TreeMap<>();
         }
 
-        return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
+        try {
+            return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
+        } catch (JsonProcessingException e) {
+            LOG.error(
+                    "Could not deserialize metric history, possibly the format changed. Discarding...");
+            return new TreeMap<>();

Review Comment:
   This will clear all existing metrics when we deploy this new version.



##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java:
##########
@@ -17,18 +17,19 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler.metrics;
 
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
-import lombok.Value;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-import java.time.Instant;
 import java.util.Map;
-import java.util.SortedMap;
 
-/** Topology and collected metric history. */
-@Value
+/** Collected scaling metrics. */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
 public class CollectedMetrics {
-    JobTopology jobTopology;
-    SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricHistory;
+    private Map<JobVertexID, Map<ScalingMetric, Double>> vertexMetrics;
+    private Map<Edge, Double> outputRatios;

Review Comment:
   This will increase the size of the metric history. We may have to pursue https://issues.apache.org/jira/browse/FLINK-31866 for future upgrades.



##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java:
##########
@@ -86,20 +87,24 @@ public AutoScalerInfo(Map<String, String> data) {
         configMap.setData(Preconditions.checkNotNull(data));
     }
 
-    @SneakyThrows
-    public SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> getMetricHistory() {
+    public SortedMap<Instant, CollectedMetrics> getMetricHistory() {
         var historyYaml = configMap.getData().get(COLLECTED_METRICS_KEY);
         if (historyYaml == null) {
             return new TreeMap<>();
         }
 
-        return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
+        try {
+            return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
+        } catch (JsonProcessingException e) {
+            LOG.error(
+                    "Could not deserialize metric history, possibly the format changed. Discarding...");
+            return new TreeMap<>();

Review Comment:
   I think this will introduce a bug where only a single metric observation in a window will be considered: Fixed via https://issues.apache.org/jira/browse/FLINK-31866 or https://issues.apache.org/jira/browse/FLINK-31867.



##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+
+import lombok.Value;
+
+import java.time.Instant;
+import java.util.SortedMap;
+
+/** Topology and collected metric history. */
+@Value
+public class CollectedMetricHistory {
+    JobTopology jobTopology;

Review Comment:
   Why do we need to persist the JobTopology? Could we save some bytes here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #571: [FLINK-31827] Compute output ratio per edge

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #571:
URL: https://github.com/apache/flink-kubernetes-operator/pull/571#discussion_r1172469980


##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java:
##########
@@ -86,20 +87,24 @@ public AutoScalerInfo(Map<String, String> data) {
         configMap.setData(Preconditions.checkNotNull(data));
     }
 
-    @SneakyThrows
-    public SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> getMetricHistory() {
+    public SortedMap<Instant, CollectedMetrics> getMetricHistory() {
         var historyYaml = configMap.getData().get(COLLECTED_METRICS_KEY);
         if (historyYaml == null) {
             return new TreeMap<>();
         }
 
-        return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
+        try {
+            return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
+        } catch (JsonProcessingException e) {
+            LOG.error(
+                    "Could not deserialize metric history, possibly the format changed. Discarding...");
+            return new TreeMap<>();

Review Comment:
   Maybe a better solution would be to throw an exception and clear the autoscaler info completely. That will basically reset the history. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #571: [FLINK-31827] Compute output ratio per edge

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #571:
URL: https://github.com/apache/flink-kubernetes-operator/pull/571#discussion_r1172471777


##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+
+import lombok.Value;
+
+import java.time.Instant;
+import java.util.SortedMap;
+
+/** Topology and collected metric history. */
+@Value
+public class CollectedMetricHistory {
+    JobTopology jobTopology;

Review Comment:
   We don't persist the JobTopology in the autoscalerinfo, only the metric history



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #571: [FLINK-31827] Compute output ratio per edge

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #571:
URL: https://github.com/apache/flink-kubernetes-operator/pull/571#discussion_r1172575638


##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java:
##########
@@ -86,20 +87,24 @@ public AutoScalerInfo(Map<String, String> data) {
         configMap.setData(Preconditions.checkNotNull(data));
     }
 
-    @SneakyThrows
-    public SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> getMetricHistory() {
+    public SortedMap<Instant, CollectedMetrics> getMetricHistory() {
         var historyYaml = configMap.getData().get(COLLECTED_METRICS_KEY);
         if (historyYaml == null) {
             return new TreeMap<>();
         }
 
-        return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
+        try {
+            return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
+        } catch (JsonProcessingException e) {
+            LOG.error(
+                    "Could not deserialize metric history, possibly the format changed. Discarding...");
+            return new TreeMap<>();

Review Comment:
   Actually that does not work without a fix that I'm working on right now. If you re-init, then the job update time can still be in the past which triggers filling the metric window immediately. PR pending which addresses this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #571: [FLINK-31827] Compute output ratio per edge

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #571:
URL: https://github.com/apache/flink-kubernetes-operator/pull/571#discussion_r1172576486


##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+
+import lombok.Value;
+
+import java.time.Instant;
+import java.util.SortedMap;
+
+/** Topology and collected metric history. */
+@Value
+public class CollectedMetricHistory {
+    JobTopology jobTopology;

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org