You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "1996fanrui (via GitHub)" <gi...@apache.org> on 2023/09/18 08:32:01 UTC

[GitHub] [flink-kubernetes-operator] 1996fanrui opened a new pull request, #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

1996fanrui opened a new pull request, #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677

   ## What is the purpose of the change
   
   The first subtask of FLIP-334.
   
   ## Brief change log
   
   [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces
   
   - User will re-implement these `EventHandler`, `StateStore` and `ScalingRealizer`, so mark these classes to the `@Experimental`.
   - User will use the `Flink-Kubenetes-Operator` or `StandaloneAutoscaler`, and don't use the `JobAutoScaler` directly, so  mark `JobAutoScaler` as the `@Internal`.
   
   
   ## Verifying this change
   
   This PR just initialize the generic autoscaler module and interfaces, all tests will be added in the subsequent PR.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): yes
     - The public API: yes
     - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1343663466


##########
flink-autoscaler/pom.xml:
##########
@@ -45,6 +45,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done

Review Comment:
   I think we should use the shaded dependency that comes with the Flink client version we are depending on. That is the simplest way to avoid any potential issues :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1765929234

   > Hi @mxm @gyfora , thanks for your hard review!
   > 
   > I addressed all comments of this PR so now. Some interfaces, code design and code style are improved, and no big changes during reviewing, I have updated the latest interfaces to the FLIP doc.
   > 
   > About manual testing, I rebuilt the image to check whether it's fine just now, and run the `AutoscalingExample` with `autoscaling.yaml`. In general, it works well. There are some warn log due to these demo job still use the deprecated configuration key, and I have updated them in the last fix commit.
   > 
   > Please help review again in your free time, big thanks~ ❤️
   > 
   > I can squash the last commit after your check. Also, please let me know if you think more manual testing is needed, thanks.
   > 
   > ```
   > 2023-10-17 15:33:56,930 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.enabled' instead of proper key 'job.autoscaler.enabled'
   > 2023-10-17 15:33:56,931 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.enabled' instead of proper key 'job.autoscaler.enabled'
   > 2023-10-17 15:33:57,205 o.a.f.a.ScalingMetricCollector [INFO ] [default.autoscaling-example] Job updated at 2023-10-17T07:32:56.133Z. Clearing metrics.
   > 2023-10-17 15:33:57,331 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.metrics.window' instead of proper key 'job.autoscaler.metrics.window'
   > 2023-10-17 15:33:57,332 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.stabilization.interval' instead of proper key 'job.autoscaler.stabilization.interval'
   > 2023-10-17 15:33:57,397 o.a.f.a.ScalingMetricCollector [WARN ] [default.autoscaling-example] pendingRecords metric for cbc357ccb763df2852fee8c4fc7d55f2 could not be found. Either a legacy source or an idle source. Assuming no pending records.
   > 2023-10-17 15:33:57,620 o.a.f.a.ScalingMetricCollector [INFO ] [default.autoscaling-example] Metric window not full until 2023-10-17T07:36:56.931915Z
   > 2023-10-17 15:33:57,623 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of proper key 'job.autoscaler.target.utilization.boundary'
   > 2023-10-17 15:33:57,624 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of proper key 'job.autoscaler.target.utilization.boundary'
   > ```
   
   Thank you @1996fanrui for the persistence and all the nice work. I will do a final pass on this later today, let's try to merge this as soon as possible to unblock current work on the autoscaler :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352282612


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -72,7 +79,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
     protected final EventRecorder eventRecorder;
     protected final StatusRecorder<CR, STATUS> statusRecorder;
-    protected final JobAutoScaler resourceScaler;
+    protected final JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> resourceScaler;

Review Comment:
   Can we rename this?
   
   ```suggestion
       protected final JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1765865647

   Hi @mxm @gyfora , thanks for your hard review! 
   
   I addressed all comments of this PR so now. Some interfaces, code design and code style are improved, and no big changes during reviewing, I have updated the latest interfaces to the FLIP doc.
   
   About manual testing, I rebuilt the image to check whether it's fine just now, and run the `AutoscalingExample` with `autoscaling.yaml`. In general, it works well. There are some warn log due to these demo job still use the deprecated configuration key, and I have updated them in the last fix commit. 
   
   Please help review again in your free time, big thanks~ ❤️ 
   
   I can squash the last commit after your check. Also, please let me know if you think more manual testing is needed, thanks.
   
   ```
   2023-10-17 15:33:56,930 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.enabled' instead of proper key 'job.autoscaler.enabled'
   2023-10-17 15:33:56,931 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.enabled' instead of proper key 'job.autoscaler.enabled'
   2023-10-17 15:33:57,205 o.a.f.a.ScalingMetricCollector [INFO ] [default.autoscaling-example] Job updated at 2023-10-17T07:32:56.133Z. Clearing metrics.
   2023-10-17 15:33:57,331 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.metrics.window' instead of proper key 'job.autoscaler.metrics.window'
   2023-10-17 15:33:57,332 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.stabilization.interval' instead of proper key 'job.autoscaler.stabilization.interval'
   2023-10-17 15:33:57,397 o.a.f.a.ScalingMetricCollector [WARN ] [default.autoscaling-example] pendingRecords metric for cbc357ccb763df2852fee8c4fc7d55f2 could not be found. Either a legacy source or an idle source. Assuming no pending records.
   2023-10-17 15:33:57,620 o.a.f.a.ScalingMetricCollector [INFO ] [default.autoscaling-example] Metric window not full until 2023-10-17T07:36:56.931915Z
   2023-10-17 15:33:57,623 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of proper key 'job.autoscaler.target.utilization.boundary'
   2023-10-17 15:33:57,624 o.a.f.c.Configuration          [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of proper key 'job.autoscaler.target.utilization.boundary'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360079090


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##########
@@ -441,15 +442,21 @@ protected Collection<AggregatedMetric> queryAggregatedMetricNames(
 
     protected abstract Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>>
             queryAllAggregatedMetrics(
-                    AbstractFlinkResource<?, ?> cr,
-                    FlinkService flinkService,
-                    Configuration conf,
+                    Context ctx,
                     Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames);
 
-    public void cleanup(AbstractFlinkResource<?, ?> cr) {
-        var resourceId = ResourceID.fromResource(cr);
-        histories.remove(resourceId);
-        availableVertexMetricNames.remove(resourceId);
+    public JobDetailsInfo getJobDetailsInfo(
+            JobAutoScalerContext<KEY> context, Duration clientTimeout) throws Exception {

Review Comment:
   Thanks @mateczagany for this comment.
   
   IIUC, you mean `ScalingMetricCollector` is using the `RestClusterClient`, and `RestApiMetricsCollector` is totally based on `RestClusterClient`, so these 2 classes can be merged into one classes, right?
   
   If so, I try to explain the difference between : `RestApiMetricsCollector` and `ScalingMetricCollector`.
   
   - `RestApiMetricsCollector` calls `RestClusterClient`, and it's used to fetch specific metrics.
   - `ScalingMetricCollector` calls `RestClusterClient` and is not used to fetch specific metrics.
   - `RestClusterClient` is used in `ScalingMetricCollector` to get some job metadata, such as: `getJobDetailsInfo` to generate the `JobTopology`, `queryFilteredMetricNames`, `updateKafkaSourceMaxParallelisms`.
   - The JobTopology is the metadata of Job, and it cannot be fetched from metrics. That means the `RestClusterClient` is needed even if we query specific metrics from other system.
   
   Based on them, it may be better to keep `ScalingMetricCollector` as abstract class and not remove `RestApiMetricsCollector`. It's easy to fetch specific metrics from other system in the future.
   
   Also, we can see the `ScalingMetricCollector` also used the `RestClusterClient` on the current master branch.
   
   WDYT? And please correct me if my understanding is wrong, thanks~
   
   [1] https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java#L368C45-L368C45



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mateczagany (via GitHub)" <gi...@apache.org>.
mateczagany commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360856610


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##########
@@ -441,15 +442,21 @@ protected Collection<AggregatedMetric> queryAggregatedMetricNames(
 
     protected abstract Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>>
             queryAllAggregatedMetrics(
-                    AbstractFlinkResource<?, ?> cr,
-                    FlinkService flinkService,
-                    Configuration conf,
+                    Context ctx,
                     Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames);
 
-    public void cleanup(AbstractFlinkResource<?, ?> cr) {
-        var resourceId = ResourceID.fromResource(cr);
-        histories.remove(resourceId);
-        availableVertexMetricNames.remove(resourceId);
+    public JobDetailsInfo getJobDetailsInfo(
+            JobAutoScalerContext<KEY> context, Duration clientTimeout) throws Exception {

Review Comment:
   That makes sense, I think you've explained it very well, thank you! 
   
   I still think that the naming and the abstraction can be a bit confusing for a fresh pair of eyes, but as you've said and as I've said in my original comment, this is not related to your PR, so I think it's perfectly fine to leave it as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355160155


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    public KubernetesJobAutoScalerContext getJobAutoScalerContext() {
+        if (autoScalerContext != null) {
+            return autoScalerContext;
+        }
+        return autoScalerContext = createJobAutoScalerContext();
+    }
+
+    private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
+        Configuration conf = new Configuration(getObserveConfig());
+        conf.set(
+                AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
+                getOperatorConfig().getFlinkClientTimeout());
+
+        CommonStatus<?> status = getResource().getStatus();
+        String jobId = status.getJobStatus().getJobId();
+
+        JobStatus jobStatus = generateJobStatusEnum(status);
+
+        return new KubernetesJobAutoScalerContext(
+                jobId == null ? null : JobID.fromHexString(jobId),
+                jobStatus,
+                conf,
+                getResourceMetricGroup(),
+                () -> getFlinkService().getClusterClient(conf),
+                resource,
+                getKubernetesClient());
+    }
+
+    @Nullable
+    private JobStatus generateJobStatusEnum(CommonStatus<?> status) {
+        if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
+            return null;
+        }
+
+        String state = status.getJobStatus().getState();
+        if (state == null) {
+            return null;
+        }
+        return JobStatus.valueOf(state);

Review Comment:
   (Didn't see your comments until I hit refresh). Fine to defer this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355174254


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
                     .stringType()
                     .asList()
                     .defaultValues()
+                    .withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
                     .withDescription(
                             "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+    public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =
+            autoScalerConfig("flink.client.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("The timeout for waiting the flink rest client to return.");

Review Comment:
   Let's keep the existing option. We can move it here like you did but it should continue to support `flink.client.timeout`. We can add it as a deprecated key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356604652


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements JobAutoScaler<KEY, Context> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError";
+
+    private final ScalingMetricCollector<KEY, Context> metricsCollector;
+    private final ScalingMetricEvaluator evaluator;
+    private final ScalingExecutor<KEY, Context> scalingExecutor;
+    private final AutoScalerEventHandler<KEY, Context> eventHandler;
+    private final ScalingRealizer<KEY, Context> scalingRealizer;
+    private final AutoScalerStateStore<KEY, Context> stateStore;
+
+    @VisibleForTesting
+    final Map<KEY, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>
+            lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
+    final Map<KEY, AutoscalerFlinkMetrics> flinkMetrics = new ConcurrentHashMap<>();
+
+    public JobAutoScalerImpl(
+            ScalingMetricCollector<KEY, Context> metricsCollector,
+            ScalingMetricEvaluator evaluator,
+            ScalingExecutor<KEY, Context> scalingExecutor,
+            AutoScalerEventHandler<KEY, Context> eventHandler,
+            ScalingRealizer<KEY, Context> scalingRealizer,
+            AutoScalerStateStore<KEY, Context> stateStore) {
+        this.metricsCollector = metricsCollector;
+        this.evaluator = evaluator;
+        this.scalingExecutor = scalingExecutor;
+        this.eventHandler = eventHandler;
+        this.scalingRealizer = scalingRealizer;
+        this.stateStore = stateStore;
+    }
+
+    @Override
+    public void scale(Context ctx) throws Exception {
+        var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx);
+
+        try {
+            if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) {
+                LOG.debug("Autoscaler is disabled");
+                clearParallelismOverrides(ctx);
+                return;
+            }
+
+            if (ctx.getJobStatus() != JobStatus.RUNNING) {
+                lastEvaluatedMetrics.remove(ctx.getJobKey());
+                return;
+            }
+
+            runScalingLogic(ctx, autoscalerMetrics);
+            stateStore.flush(ctx);

Review Comment:
   Looks good to me. Just wanted to double-check! Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356017440


##########
flink-autoscaler/pom.xml:
##########
@@ -45,6 +45,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done

Review Comment:
   Thanks @XComp for the detailed analysis.
   
   > Do we need to shade the jackson dependency?
   
   IIUC, it's not required now. If it's required, `flink-kubernetes-operator` must have completed the shade before.
   
   I prefer to use the shaded version in the future, and the motivation as your mention: we have custom auto-scaler implementations.
   
   The flink shaded version should be enough, so I agree with @gyfora and @mxm : let's version bundled with the flink client. And we can re-shade it if it doesn't work well in the future. WDYT?
   
   
   -----------------------------
   
   The second question:
   
   This PR still using the jackson version of `flink-kubernetes-operator` instead of flink shaded version. I want to update it after `flink-1.18` released.
   
   The reason is : current autoscaler is using the `loaderOptions` to limit the serialized size. 
   
   - The shaded jackson version of `flink-1.17` is `2.13.4-16.1`, it doesn't support the `loaderOptions`. 
   - The shaded jackson version of `flink-1.18` is `2.14.2-17.0`, it supports the `loaderOptions`. 
   
   What do you think about updating it after `flink-1.18` is released, it should be released soon. If it makes sense, I can create a JIRA to follow it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1328450023


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+/**
+ * The state store is responsible for store all states during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    void storeScalingHistory(Context jobContext, String scalingHistory);

Review Comment:
   > improvement1
   
   The improvement1 sounds make sense to me, and I need to improve the `SerializableState`. The current `deserialize` method needs to create a object first, and then call `object.deserialize(serializedResult)`. In general, the serialize is a separate object.
   
   I'm afraid whether it's too complex If we introduce 2 class for each state.
   
   > improvement2
   
   For improvement2, my concern is the serialized type is changed, and all old jobs cannot be compatible directly. 
   
   The compatibility of `byte[]` must be stronger than String, but the benefits it brings are uncertain (because there may not be classes that can only be serialized into `byte[]` in the future).
   
   The negative impact is certain, and it will bring additional migration costs to historical users.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+/**
+ * The state store is responsible for store all states during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    void storeScalingHistory(Context jobContext, String scalingHistory);

Review Comment:
   Hi @gyfora  @mxm , because of some offline comments and ease of review, I split the decoupling into the FLINK-33097 and FLINK-33098. Of course, if you think they should be merged to one PR. I will go ahead at this PR with multiple commits.
   
   @Samrat002 provided 2 improvements about the `StateStore`.
   
   ## 1. Using the structured class instead of `String`
   
   The structured class is clearer than String.
   
   First of all, we define the `SerializableState` interface to abstract the `serialize` and `deserialize`.
   
   ```
       interface SerializableState<State extends SerializableState> {
   
           String serialize();
   
           State deserialize(String serializedResult);
       }
   ```
   
   And then, define a `ScalingHistory` class, it implement the `SerializableState`.
   
   
   ## 2. Using the `byte[]` instead of `String` as the serialized result
   
   Reason: In the future there may be some complex state objects that cannot be serialized to String.
   
   
   Hi @Samrat002 , please correct me if my description is wrong, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354365364


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    public KubernetesJobAutoScalerContext getJobAutoScalerContext() {
+        if (autoScalerContext != null) {
+            return autoScalerContext;
+        }
+        return autoScalerContext = createJobAutoScalerContext();
+    }
+
+    private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
+        Configuration conf = new Configuration(getObserveConfig());
+        conf.set(
+                AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
+                getOperatorConfig().getFlinkClientTimeout());
+
+        CommonStatus<?> status = getResource().getStatus();
+        String jobId = status.getJobStatus().getJobId();
+
+        JobStatus jobStatus = generateJobStatusEnum(status);
+
+        return new KubernetesJobAutoScalerContext(
+                jobId == null ? null : JobID.fromHexString(jobId),
+                jobStatus,
+                conf,
+                getResourceMetricGroup(),
+                () -> getFlinkService().getClusterClient(conf),
+                resource,
+                getKubernetesClient());
+    }
+
+    @Nullable
+    private JobStatus generateJobStatusEnum(CommonStatus<?> status) {
+        if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
+            return null;
+        }
+
+        String state = status.getJobStatus().getState();
+        if (state == null) {
+            return null;
+        }
+        return JobStatus.valueOf(state);

Review Comment:
   Let's not change this as part of this PR but feel free to open a JIRA for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354589163


##########
flink-autoscaler/pom.xml:
##########
@@ -45,6 +45,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done

Review Comment:
   hm, thinking about it: Doesn't we add an extra layer of complexity here? Let's say that there's a CVS in the dependency which requires us to update the version. Relying on Flink's shaded dependency would mean that we would have to wait for `flink-shaded` to be updated.
   
   Taking one step back - isn't the question: Do we need to shade the jackson dependency?
   If the answer is yes, shouldn't the Kubernetes Operator shade its dependency to be independent of `flink-shaded` releases and be able to react faster when having to update those dependencies?
   If the answer is no, we could just use the dependency as is and don't need to worry about excluding anything from Flink (because its dependency is shaded already).
   
   Its the same approach how we want to deal with it for external connectors as far as I understand. And if I understand the motivation correctly, shading should be necessary so that downstream projects (i.e. custom auto-scaler implementations) would be able to handle their own dependency versions. Does that make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356605280


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    /**
+     * Handle the event.
+     *
+     * @param interval When interval is great than 0, events that repeat within the interval will be
+     *     ignored.
+     */
+    void handleEvent(
+            Context context,
+            Type type,
+            String reason,
+            String message,
+            @Nullable String messageKey,
+            @Nullable Duration interval);
+
+    /** The type of the events. */
+    enum Type {
+        Normal,
+        Error

Review Comment:
   Fine with me. Thanks for the explanation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1351975279


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -15,149 +15,180 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.config;
+package org.apache.flink.autoscaler.config;
 
+import org.apache.flink.autoscaler.metrics.MetricAggregator;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
 
 import java.time.Duration;
 import java.util.List;
 
-import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
-
 /** Config options related to the autoscaler module. */
 public class AutoScalerOptions {
 
+    public static final String DEPRECATED_K8S_OP_CONF_PREFIX = "kubernetes.operator.";
+    public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler.";
+
+    private static String deprecatedOperatorConfigKey(String key) {
+        return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key;
+    }
+
+    private static String autoScalerConfigKey(String key) {
+        return AUTOSCALER_CONF_PREFIX + key;
+    }
+
     private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
-        return operatorConfig("job.autoscaler." + key);
+        return ConfigOptions.key(autoScalerConfigKey(key));
     }
 
     public static final ConfigOption<Boolean> AUTOSCALER_ENABLED =
             autoScalerConfig("enabled")
                     .booleanType()
                     .defaultValue(false)
+                    .withDeprecatedKeys(deprecatedOperatorConfigKey("enabled"))

Review Comment:
   Added:
   > Note: The option prefix `kubernetes.operator.` was removed in FLIP-334, because the autoscaler module was decoupled from flink-kubernetes-operator.
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements JobAutoScaler<KEY, Context> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError";

Review Comment:
   It's used at `BacklogBasedScalingTest`.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.metrics;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nonnull;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** The utils for scaling history. */
+public class ScalingHistoryUtils {
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+        addToScalingHistoryAndStore(
+                stateStore,
+                context,
+                getTrimmedScalingHistory(stateStore, context, now),
+                now,
+                summaries);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+
+        summaries.forEach(
+                (id, summary) ->
+                        scalingHistory.computeIfAbsent(id, j -> new TreeMap<>()).put(now, summary));
+        stateStore.storeScalingHistory(context, scalingHistory);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void updateVertexList(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context ctx,
+            Instant now,
+            Set<JobVertexID> vertexSet)
+            throws Exception {
+        Map<JobVertexID, SortedMap<Instant, ScalingSummary>> trimmedScalingHistory =
+                getTrimmedScalingHistory(stateStore, ctx, now);
+
+        if (trimmedScalingHistory.keySet().removeIf(v -> !vertexSet.contains(v))) {
+            stateStore.storeScalingHistory(ctx, trimmedScalingHistory);
+        }
+    }
+
+    @Nonnull
+    public static <KEY, Context extends JobAutoScalerContext<KEY>>
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getTrimmedScalingHistory(
+                    AutoScalerStateStore<KEY, Context> autoScalerStateStore,
+                    Context context,
+                    Instant now)
+                    throws Exception {
+        var conf = context.getConfiguration();
+        return autoScalerStateStore
+                .getScalingHistory(context)
+                .map(
+                        scalingHistory -> {
+                            var entryIt = scalingHistory.entrySet().iterator();
+                            while (entryIt.hasNext()) {
+                                var entry = entryIt.next();
+                                // Limit how long past scaling decisions are remembered
+                                entry.setValue(
+                                        entry.getValue()
+                                                .tailMap(
+                                                        now.minus(
+                                                                conf.get(
+                                                                        AutoScalerOptions
+                                                                                .VERTEX_SCALING_HISTORY_AGE))));

Review Comment:
   Sorry, I didn't understand this comment. I didn't change these code, they are moved from `AutoScalerInfo`[1].
   
   Do you mean this tailMap cannot be used directly? We should tail it, and recreate a new TreeMap?
   
   
   [1] https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java#L130



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
                     .stringType()
                     .asList()
                     .defaultValues()
+                    .withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
                     .withDescription(
                             "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+    public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =
+            autoScalerConfig("flink.client.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("The timeout for waiting the flink rest client to return.");

Review Comment:
   Didn't use the `client-timeout` of flink because it's used at flink client process, such as: flink client start a job.
   
   Here is flink rest client timeout, it uses `org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions#OPERATOR_FLINK_CLIENT_TIMEOUT` before this PR. And I call the `conf.set(AutoScalerOptions.FLINK_CLIENT_TIMEOUT, getOperatorConfig().getFlinkClientTimeout());` when createJobAutoScalerContext in this PR.
   
   This new option is similar with `OPERATOR_FLINK_CLIENT_TIME`, and the default value is 10s. That's why adding a option here.
   
   If it's not clear, how about rename it to `flink.rest-client.timeout`?
   



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+    @Test
+    public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
+        RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
+                new RestApiMetricsCollector<>();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        Map<String, FlinkMetric> flinkMetrics =
+                Map.of(
+                        "a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+                        "b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+        Map<JobVertexID, Map<String, FlinkMetric>> metrics = Map.of(jobVertexID, flinkMetrics);
+
+        List<AggregatedMetric> aggregatedMetricsResponse =
+                List.of(
+                        new AggregatedMetric(
+                                "a.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "b.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "c.unrelated", Double.NaN, Double.NaN, Double.NaN, 100.));
+
+        Configuration conf = new Configuration();
+        RestClusterClient<String> restClusterClient =
+                new RestClusterClient<>(
+                        conf,
+                        "test-cluster",
+                        (c, e) -> new StandaloneClientHAServices("localhost")) {
+                    @Override
+                    public <
+                                    M extends MessageHeaders<R, P, U>,
+                                    U extends MessageParameters,
+                                    R extends RequestBody,
+                                    P extends ResponseBody>
+                            CompletableFuture<P> sendRequest(
+                                    M messageHeaders, U messageParameters, R request) {
+                        if (messageHeaders instanceof AggregatedSubtaskMetricsHeaders) {
+                            return (CompletableFuture<P>)
+                                    CompletableFuture.completedFuture(
+                                            new AggregatedMetricsResponseBody(
+                                                    aggregatedMetricsResponse));
+                        }
+                        return (CompletableFuture<P>)
+                                CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+                    }
+                };
+
+        JobID jobID = new JobID();
+        JobAutoScalerContext<JobID> context =
+                new JobAutoScalerContext<>(
+                        jobID,
+                        jobID,
+                        conf,
+                        new UnregisteredMetricsGroup(),
+                        () -> restClusterClient);
+
+        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap =
+                collector.queryAllAggregatedMetrics(context, metrics);
+
+        System.out.println(jobVertexIDMapMap);

Review Comment:
   Sorry, what's the `left-over` here? I just copy it from old code. 😂



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -154,7 +155,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
     }
 
     @VisibleForTesting
-    protected static void computeProcessingRateThresholds(
+    public static void computeProcessingRateThresholds(

Review Comment:
   My mistake, revoked.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+    @Test
+    public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
+        RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
+                new RestApiMetricsCollector<>();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        Map<String, FlinkMetric> flinkMetrics =
+                Map.of(
+                        "a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+                        "b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+        Map<JobVertexID, Map<String, FlinkMetric>> metrics = Map.of(jobVertexID, flinkMetrics);
+
+        List<AggregatedMetric> aggregatedMetricsResponse =
+                List.of(
+                        new AggregatedMetric(
+                                "a.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "b.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "c.unrelated", Double.NaN, Double.NaN, Double.NaN, 100.));
+
+        Configuration conf = new Configuration();
+        RestClusterClient<String> restClusterClient =
+                new RestClusterClient<>(
+                        conf,
+                        "test-cluster",
+                        (c, e) -> new StandaloneClientHAServices("localhost")) {
+                    @Override
+                    public <
+                                    M extends MessageHeaders<R, P, U>,
+                                    U extends MessageParameters,
+                                    R extends RequestBody,
+                                    P extends ResponseBody>
+                            CompletableFuture<P> sendRequest(
+                                    M messageHeaders, U messageParameters, R request) {
+                        if (messageHeaders instanceof AggregatedSubtaskMetricsHeaders) {
+                            return (CompletableFuture<P>)
+                                    CompletableFuture.completedFuture(
+                                            new AggregatedMetricsResponseBody(
+                                                    aggregatedMetricsResponse));
+                        }
+                        return (CompletableFuture<P>)
+                                CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+                    }
+                };
+
+        JobID jobID = new JobID();
+        JobAutoScalerContext<JobID> context =
+                new JobAutoScalerContext<>(
+                        jobID,
+                        jobID,
+                        conf,
+                        new UnregisteredMetricsGroup(),
+                        () -> restClusterClient);
+
+        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap =
+                collector.queryAllAggregatedMetrics(context, metrics);
+
+        System.out.println(jobVertexIDMapMap);

Review Comment:
   Sorry, what's the `left-over` here? I just copy it from old code. 😂



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -38,42 +36,42 @@
 import java.util.Map;
 import java.util.SortedMap;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Component responsible for computing vertex parallelism based on the scaling metrics. */
-public class JobVertexScaler {
+public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobVertexScaler.class);
 
+    @VisibleForTesting public static final String INEFFECTIVE_SCALING = "IneffectiveScaling";

Review Comment:
   It's used at `JobVertexScalerTest`, and I updated it to `protected`.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java:
##########
@@ -217,73 +198,38 @@ public void endToEnd() throws Exception {
 
         now = now.plus(Duration.ofSeconds(10));
         setClocksTo(now);
-        restart(now);
-
-        // after restart while the job is not running the evaluated metrics are gone
-        autoscaler.scale(getResourceContext(app, ctx));
-        assertEquals(3, getOrCreateInfo(app, kubernetesClient).getMetricHistory().size());
-        assertNull(autoscaler.lastEvaluatedMetrics.get(ResourceID.fromResource(app)));
-        scaledParallelism = ScalingExecutorTest.getScaledParallelism(kubernetesClient, app);
-        assertEquals(4, scaledParallelism.get(source));
-        assertEquals(4, scaledParallelism.get(sink));
-
-        now = now.plus(Duration.ofSeconds(1));
-        setClocksTo(now);
-        running(now);
 
-        // once the job is running we got back the evaluated metric except the recommended
-        // parallelisms (until the metric window is full again)
-        autoscaler.scale(getResourceContext(app, ctx));
-        assertEquals(1, getOrCreateInfo(app, kubernetesClient).getMetricHistory().size());
+        autoscaler.scale(context);
+        assertEvaluatedMetricsSize(1);
         assertEquals(4., getCurrentMetricValue(source, PARALLELISM));
         assertEquals(4., getCurrentMetricValue(sink, PARALLELISM));
-        assertNull(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
-        assertNull(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
-        scaledParallelism = ScalingExecutorTest.getScaledParallelism(kubernetesClient, app);
+        assertEquals(4., getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
+        assertEquals(4., getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(4, scaledParallelism.get(source));
         assertEquals(4, scaledParallelism.get(sink));
     }
 
+    private void assertEvaluatedMetricsSize(int expectedSize) {
+        Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
+                stateStore.getEvaluatedMetrics(context);
+        assertThat(evaluatedMetricsOpt).isPresent();
+        assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize);
+    }
+
     private Double getCurrentMetricValue(JobVertexID jobVertexID, ScalingMetric scalingMetric) {
         var metric =
                 autoscaler
                         .lastEvaluatedMetrics
-                        .get(ResourceID.fromResource(app))
+                        .get(context.getJobKey())
                         .get(jobVertexID)
                         .get(scalingMetric);
         return metric == null ? null : metric.getCurrent();
     }
 
-    private void restart(Instant now) {
-        metricsCollector.setJobUpdateTs(now);
-        app.getStatus().getJobStatus().setState(JobStatus.CREATED.name());
-    }
-
-    private void running(Instant now) {
-        metricsCollector.setJobUpdateTs(now);
-        app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
-    }
-

Review Comment:
   This comment is a same question with the previous comment. I reply it together.
   
   The reason is the `waitingForRunning` logic is moved from `autoscaler` module to `org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler#waitingForRunning`[1].
   
   This logic is not a part of generic `autoscaler` now. I will move this test to `flink-kubernetes-operator` module later.
   
   [1] https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1341955135
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355181704


##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+    @Test
+    public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
+        RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
+                new RestApiMetricsCollector<>();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        Map<String, FlinkMetric> flinkMetrics =
+                Map.of(
+                        "a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+                        "b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+        Map<JobVertexID, Map<String, FlinkMetric>> metrics = Map.of(jobVertexID, flinkMetrics);
+
+        List<AggregatedMetric> aggregatedMetricsResponse =
+                List.of(
+                        new AggregatedMetric(
+                                "a.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "b.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "c.unrelated", Double.NaN, Double.NaN, Double.NaN, 100.));
+
+        Configuration conf = new Configuration();
+        RestClusterClient<String> restClusterClient =
+                new RestClusterClient<>(
+                        conf,
+                        "test-cluster",
+                        (c, e) -> new StandaloneClientHAServices("localhost")) {
+                    @Override
+                    public <
+                                    M extends MessageHeaders<R, P, U>,
+                                    U extends MessageParameters,
+                                    R extends RequestBody,
+                                    P extends ResponseBody>
+                            CompletableFuture<P> sendRequest(
+                                    M messageHeaders, U messageParameters, R request) {
+                        if (messageHeaders instanceof AggregatedSubtaskMetricsHeaders) {
+                            return (CompletableFuture<P>)
+                                    CompletableFuture.completedFuture(
+                                            new AggregatedMetricsResponseBody(
+                                                    aggregatedMetricsResponse));
+                        }
+                        return (CompletableFuture<P>)
+                                CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+                    }
+                };
+
+        JobID jobID = new JobID();
+        JobAutoScalerContext<JobID> context =
+                new JobAutoScalerContext<>(
+                        jobID,
+                        jobID,
+                        conf,
+                        new UnregisteredMetricsGroup(),
+                        () -> restClusterClient);
+
+        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap =
+                collector.queryAllAggregatedMetrics(context, metrics);
+
+        System.out.println(jobVertexIDMapMap);

Review Comment:
   We should probably remove the println anyways, even if we copy this over.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350028811


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
                     .stringType()
                     .asList()
                     .defaultValues()
+                    .withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
                     .withDescription(
                             "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+    public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =
+            autoScalerConfig("flink.client.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("The timeout for waiting the flink rest client to return.");

Review Comment:
   Do we need to expose this as a configuration or can we just use the default? 
   
   I think it is better to let the user configure standard Flink configs as listed here: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#client-timeout



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -15,149 +15,180 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.config;
+package org.apache.flink.autoscaler.config;
 
+import org.apache.flink.autoscaler.metrics.MetricAggregator;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
 
 import java.time.Duration;
 import java.util.List;
 
-import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
-
 /** Config options related to the autoscaler module. */
 public class AutoScalerOptions {
 
+    public static final String DEPRECATED_K8S_OP_CONF_PREFIX = "kubernetes.operator.";
+    public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler.";
+
+    private static String deprecatedOperatorConfigKey(String key) {
+        return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key;
+    }
+
+    private static String autoScalerConfigKey(String key) {
+        return AUTOSCALER_CONF_PREFIX + key;
+    }
+
     private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
-        return operatorConfig("job.autoscaler." + key);
+        return ConfigOptions.key(autoScalerConfigKey(key));
     }
 
     public static final ConfigOption<Boolean> AUTOSCALER_ENABLED =
             autoScalerConfig("enabled")
                     .booleanType()
                     .defaultValue(false)
+                    .withDeprecatedKeys(deprecatedOperatorConfigKey("enabled"))

Review Comment:
   This might confuse some existing users because the deprecated keys will not appear on the configuration page. Can we add a note on the configuration page that we renamed the configuration prefix?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -38,42 +36,42 @@
 import java.util.Map;
 import java.util.SortedMap;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Component responsible for computing vertex parallelism based on the scaling metrics. */
-public class JobVertexScaler {
+public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobVertexScaler.class);
 
+    @VisibleForTesting public static final String INEFFECTIVE_SCALING = "IneffectiveScaling";

Review Comment:
   Can we inline this? It isn't used in tests either.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java:
##########
@@ -41,159 +37,104 @@
 import java.util.Map;
 import java.util.TreeMap;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.JobVertexScaler.INNEFFECTIVE_MESSAGE_FORMAT;
+import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING;
+import static org.apache.flink.autoscaler.JobVertexScaler.INNEFFECTIVE_MESSAGE_FORMAT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for vertex parallelism scaler logic. */
-@EnableKubernetesMockClient(crud = true)
 public class JobVertexScalerTest {
 
-    private JobVertexScaler vertexScaler;
-    private Configuration conf;
-
-    private KubernetesClient kubernetesClient;
-    private EventCollector eventCollector;
-
-    private FlinkDeployment flinkDep;
+    private EventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
+    private JobVertexScaler<JobID, JobAutoScalerContext<JobID>> vertexScaler;
+    private JobAutoScalerContext<JobID> context;
 
     @BeforeEach
     public void setup() {
-        flinkDep = TestUtils.buildApplicationCluster();
-        kubernetesClient.resource(flinkDep).createOrReplace();
-        eventCollector = new EventCollector();
-        vertexScaler = new JobVertexScaler(new EventRecorder(eventCollector));
-        conf = new Configuration();
+        eventCollector = new EventCollector<>();
+        vertexScaler = new JobVertexScaler<>(eventCollector);
+        var conf = new Configuration();
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+        JobID jobID = new JobID();
+        context =
+                new JobAutoScalerContext<>(
+                        jobID, jobID, conf, new UnregisteredMetricsGroup(), null);
     }
 
     @Test
     public void testParallelismScaling() {
         var op = new JobVertexID();
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        context.getConfiguration().set(AutoScalerOptions.TARGET_UTILIZATION, 1.);

Review Comment:
   Can we re-add `conf` above and initialize it with `context.getconfiguration()`?



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/EventCollector.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The event handler with collect events.
+ *
+ * @param <KEY>
+ * @param <Context>
+ */
+public class EventCollector<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements AutoScalerEventHandler<KEY, Context> {
+
+    public final LinkedList<Event<KEY, Context>> events = new LinkedList<>();
+
+    public final Map<String, Event<KEY, Context>> eventMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void handleEvent(
+            Context context,
+            Type type,
+            String reason,
+            String message,
+            @Nullable String messageKey) {
+        String eventKey =
+                generateEventKey(context, type, reason, messageKey != null ? messageKey : message);
+        Event<KEY, Context> event = eventMap.get(eventKey);
+        if (event == null) {
+            Event<KEY, Context> newEvent = new Event<>(context, type, reason, message, messageKey);
+            events.add(newEvent);
+            eventMap.put(eventKey, newEvent);
+        } else {
+            event.incrementCount();
+            events.add(event);
+        }
+    }
+
+    private String generateEventKey(Context context, Type type, String reason, String message) {
+        return context.getJobID() + type.name() + reason + message;
+    }
+
+    /** The collected event. */
+    public static class Event<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+        private final Context context;
+
+        private final Type type;
+
+        private final String reason;
+
+        private final String message;
+
+        @Nullable private final String messageKey;
+
+        private int count;
+
+        public Event(
+                Context context,
+                Type type,
+                String reason,
+                String message,
+                @Nullable String messageKey) {
+            this.context = context;
+            this.type = type;
+            this.reason = reason;
+            this.message = message;
+            this.messageKey = messageKey;
+            this.count = 1;
+        }
+
+        public Context getContext() {
+            return context;
+        }
+
+        public Type getType() {
+            return type;
+        }
+
+        public String getReason() {
+            return reason;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        @Nullable
+        public String getMessageKey() {
+            return messageKey;
+        }
+
+        private void incrementCount() {
+            count++;
+        }
+
+        public int getCount() {
+            return count;
+        }

Review Comment:
   Those all can be replaced by `@Getter` on the corresponding fields (except incrementCount).



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The config map store, it's responsible for store/get/remove the state with String type. */

Review Comment:
   ```suggestion
   /** The ConfigMapStore persists state in Kubernetes ConfigMaps */
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
         }
     }
 
+    private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
+        KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext();
+
+        if (autoscalerDisabled(ctx)) {
+            autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false);
+            resourceScaler.scale(autoScalerContext);
+            return;
+        }
+        if (waitingForRunning(ctx.getResource().getStatus())) {
+            LOG.info("Autoscaler is waiting for stable, running state");
+            resourceScaler.cleanup(autoScalerContext.getJobKey());
+            return;
+        }
+        resourceScaler.scale(autoScalerContext);

Review Comment:
   ```suggestion
           if (autoscalerDisabled(ctx)) {
               autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false);
               resourceScaler.scale(autoScalerContext);
           } else if (waitingForRunning(ctx.getResource().getStatus())) {
               LOG.info("Autoscaler is waiting for stable, running state");
               resourceScaler.cleanup(autoScalerContext.getJobKey());
           } else {
               resourceScaler.scale(autoScalerContext);
           }
   ```
   
   Much easier to read for me.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+/** The kubernetes job autoscaler context. */

Review Comment:
   ```suggestion
   /** An implementation of JobAutoscalerContext for Kubernetes. */
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+
+/**
+ * The state store is responsible for store all states during scaling.

Review Comment:
   ```suggestion
    * The state store is responsible for storing all state during scaling.
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##########
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.reconciler.deployment;
+package org.apache.flink.autoscaler.event;
 
-import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
-import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
 
-/** An autoscaler implementation which does nothing. */
-public class NoopJobAutoscalerFactory implements JobAutoScalerFactory, JobAutoScaler {
+import javax.annotation.Nullable;
 
-    @Override
-    public JobAutoScaler create(EventRecorder eventRecorder) {
-        return this;
-    }
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
 
-    @Override
-    public void scale(FlinkResourceContext<?> ctx) {}
+    void handleEvent(
+            Context context, Type type, String reason, String message, @Nullable String messageKey);
 
-    @Override
-    public void cleanup(FlinkResourceContext<?> ctx) {}
+    /** The type of the events. */
+    enum Type {
+        Normal,
+        Warning

Review Comment:
   What about `Error`?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -154,7 +155,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
     }
 
     @VisibleForTesting
-    protected static void computeProcessingRateThresholds(
+    public static void computeProcessingRateThresholds(

Review Comment:
   Any reason for changing the visibility here? I couldn't find why this was required.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements JobAutoScaler<KEY, Context> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError";

Review Comment:
   Can we inline this constant? This isn't used in tests either.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java:
##########
@@ -217,73 +198,38 @@ public void endToEnd() throws Exception {
 
         now = now.plus(Duration.ofSeconds(10));
         setClocksTo(now);
-        restart(now);
-
-        // after restart while the job is not running the evaluated metrics are gone
-        autoscaler.scale(getResourceContext(app, ctx));
-        assertEquals(3, getOrCreateInfo(app, kubernetesClient).getMetricHistory().size());
-        assertNull(autoscaler.lastEvaluatedMetrics.get(ResourceID.fromResource(app)));
-        scaledParallelism = ScalingExecutorTest.getScaledParallelism(kubernetesClient, app);
-        assertEquals(4, scaledParallelism.get(source));
-        assertEquals(4, scaledParallelism.get(sink));
-

Review Comment:
   Why did we remove this check?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+
+/**
+ * The state store is responsible for store all states during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    void storeScalingHistory(
+            Context jobContext, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory)
+            throws Exception;
+
+    Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> getScalingHistory(
+            Context jobContext) throws Exception;
+
+    void removeScalingHistory(Context jobContext) throws Exception;
+
+    void storeEvaluatedMetrics(
+            Context jobContext, SortedMap<Instant, CollectedMetrics> evaluatedMetrics)
+            throws Exception;
+
+    Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(Context jobContext)
+            throws Exception;
+
+    void removeEvaluatedMetrics(Context jobContext) throws Exception;
+
+    void storeParallelismOverrides(Context jobContext, Map<String, String> parallelismOverrides)
+            throws Exception;
+
+    Optional<Map<String, String>> getParallelismOverrides(Context jobContext) throws Exception;
+
+    void removeParallelismOverrides(Context jobContext) throws Exception;
+
+    /**
+     * The flush is needed because we just save data in cache for all store methods, and flush these
+     * data to the physical storage after the flush method is called to improve the performance.

Review Comment:
   ```suggestion
        * Flushing is needed because we just save data in cache for all store methods. 
        * For less write operations, we flush the cached data to the physical storage 
        * only after all operations have been performed.
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.metrics;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nonnull;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** The utils for scaling history. */
+public class ScalingHistoryUtils {
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+        addToScalingHistoryAndStore(
+                stateStore,
+                context,
+                getTrimmedScalingHistory(stateStore, context, now),
+                now,
+                summaries);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+
+        summaries.forEach(
+                (id, summary) ->
+                        scalingHistory.computeIfAbsent(id, j -> new TreeMap<>()).put(now, summary));
+        stateStore.storeScalingHistory(context, scalingHistory);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void updateVertexList(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context ctx,
+            Instant now,
+            Set<JobVertexID> vertexSet)
+            throws Exception {
+        Map<JobVertexID, SortedMap<Instant, ScalingSummary>> trimmedScalingHistory =
+                getTrimmedScalingHistory(stateStore, ctx, now);
+
+        if (trimmedScalingHistory.keySet().removeIf(v -> !vertexSet.contains(v))) {
+            stateStore.storeScalingHistory(ctx, trimmedScalingHistory);
+        }
+    }
+
+    @Nonnull
+    public static <KEY, Context extends JobAutoScalerContext<KEY>>
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getTrimmedScalingHistory(
+                    AutoScalerStateStore<KEY, Context> autoScalerStateStore,
+                    Context context,
+                    Instant now)
+                    throws Exception {
+        var conf = context.getConfiguration();
+        return autoScalerStateStore
+                .getScalingHistory(context)
+                .map(
+                        scalingHistory -> {
+                            var entryIt = scalingHistory.entrySet().iterator();
+                            while (entryIt.hasNext()) {
+                                var entry = entryIt.next();
+                                // Limit how long past scaling decisions are remembered
+                                entry.setValue(
+                                        entry.getValue()
+                                                .tailMap(
+                                                        now.minus(
+                                                                conf.get(
+                                                                        AutoScalerOptions
+                                                                                .VERTEX_SCALING_HISTORY_AGE))));

Review Comment:
   I think this has caused issue in the pasts. It is better to either use a new TreeMap (versus a subset) or clear old entries like this:
   
   ```java
                                                   .headMap(
                                                           now.minus(
                                                                   conf.get(
                                                                           AutoScalerOptions
                                                                                   .VERTEX_SCALING_HISTORY_AGE)))
                                                           .clear();
   ```



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java:
##########
@@ -217,73 +198,38 @@ public void endToEnd() throws Exception {
 
         now = now.plus(Duration.ofSeconds(10));
         setClocksTo(now);
-        restart(now);
-
-        // after restart while the job is not running the evaluated metrics are gone
-        autoscaler.scale(getResourceContext(app, ctx));
-        assertEquals(3, getOrCreateInfo(app, kubernetesClient).getMetricHistory().size());
-        assertNull(autoscaler.lastEvaluatedMetrics.get(ResourceID.fromResource(app)));
-        scaledParallelism = ScalingExecutorTest.getScaledParallelism(kubernetesClient, app);
-        assertEquals(4, scaledParallelism.get(source));
-        assertEquals(4, scaledParallelism.get(sink));
-
-        now = now.plus(Duration.ofSeconds(1));
-        setClocksTo(now);
-        running(now);
 
-        // once the job is running we got back the evaluated metric except the recommended
-        // parallelisms (until the metric window is full again)
-        autoscaler.scale(getResourceContext(app, ctx));
-        assertEquals(1, getOrCreateInfo(app, kubernetesClient).getMetricHistory().size());
+        autoscaler.scale(context);
+        assertEvaluatedMetricsSize(1);
         assertEquals(4., getCurrentMetricValue(source, PARALLELISM));
         assertEquals(4., getCurrentMetricValue(sink, PARALLELISM));
-        assertNull(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
-        assertNull(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
-        scaledParallelism = ScalingExecutorTest.getScaledParallelism(kubernetesClient, app);
+        assertEquals(4., getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
+        assertEquals(4., getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(4, scaledParallelism.get(source));
         assertEquals(4, scaledParallelism.get(sink));
     }
 
+    private void assertEvaluatedMetricsSize(int expectedSize) {
+        Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
+                stateStore.getEvaluatedMetrics(context);
+        assertThat(evaluatedMetricsOpt).isPresent();
+        assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize);
+    }
+
     private Double getCurrentMetricValue(JobVertexID jobVertexID, ScalingMetric scalingMetric) {
         var metric =
                 autoscaler
                         .lastEvaluatedMetrics
-                        .get(ResourceID.fromResource(app))
+                        .get(context.getJobKey())
                         .get(jobVertexID)
                         .get(scalingMetric);
         return metric == null ? null : metric.getCurrent();
     }
 
-    private void restart(Instant now) {
-        metricsCollector.setJobUpdateTs(now);
-        app.getStatus().getJobStatus().setState(JobStatus.CREATED.name());
-    }
-
-    private void running(Instant now) {
-        metricsCollector.setJobUpdateTs(now);
-        app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
-    }
-

Review Comment:
   Why did we remove the Job status changes and only set the job update time above?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoader.java:
##########
@@ -17,41 +17,37 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
-import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ServiceLoader;
-
-/** Loads the active Autoscaler implementation from the classpath. */
+import org.apache.flink.autoscaler.JobAutoScaler;
+import org.apache.flink.autoscaler.JobAutoScalerImpl;
+import org.apache.flink.autoscaler.RestApiMetricsCollector;
+import org.apache.flink.autoscaler.ScalingExecutor;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.kubernetes.operator.autoscaler.ConfigMapStore;
+import org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerEventHandler;
+import org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerStateStore;
+import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
+import org.apache.flink.kubernetes.operator.autoscaler.KubernetesScalingRealizer;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+/** Loads the Autoscaler implementation. */
 public class AutoscalerLoader {

Review Comment:
   Given that there isn't anything dynamically loaded anymore, this class should be removed. Or renamed to `AutoscalerFactory`.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/ScalingRealizerCollector.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.realizer;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * The event handler with collect events.
+ *
+ * @param <KEY>
+ * @param <Context>
+ */

Review Comment:
   ```suggestion
   /** The event handler for collecting scaling events. */
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements JobAutoScaler<KEY, Context> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError";
+
+    private final ScalingMetricCollector<KEY, Context> metricsCollector;
+    private final ScalingMetricEvaluator evaluator;
+    private final ScalingExecutor<KEY, Context> scalingExecutor;
+    private final AutoScalerEventHandler<KEY, Context> eventHandler;
+    private final ScalingRealizer<KEY, Context> scalingRealizer;
+    private final AutoScalerStateStore<KEY, Context> stateStore;
+
+    @VisibleForTesting
+    final Map<KEY, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>
+            lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
+    final Map<KEY, AutoscalerFlinkMetrics> flinkMetrics = new ConcurrentHashMap<>();
+
+    public JobAutoScalerImpl(
+            ScalingMetricCollector<KEY, Context> metricsCollector,
+            ScalingMetricEvaluator evaluator,
+            ScalingExecutor<KEY, Context> scalingExecutor,
+            AutoScalerEventHandler<KEY, Context> eventHandler,
+            ScalingRealizer<KEY, Context> scalingRealizer,
+            AutoScalerStateStore<KEY, Context> stateStore) {
+        this.metricsCollector = metricsCollector;
+        this.evaluator = evaluator;
+        this.scalingExecutor = scalingExecutor;
+        this.eventHandler = eventHandler;
+        this.scalingRealizer = scalingRealizer;
+        this.stateStore = stateStore;
+    }
+
+    @Override
+    public void scale(Context ctx) throws Exception {
+        var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx);
+
+        try {
+            runScalingLogic(ctx, autoscalerMetrics);
+        } catch (Throwable e) {
+            onError(ctx, autoscalerMetrics, e);
+        } finally {
+            applyParallelismOverrides(ctx);
+        }
+    }
+
+    @Override
+    public void cleanup(KEY jobKey) {
+        LOG.info("Cleaning up autoscaling meta data");
+        metricsCollector.cleanup(jobKey);
+        lastEvaluatedMetrics.remove(jobKey);
+        flinkMetrics.remove(jobKey);
+        stateStore.removeInfoFromCache(jobKey);
+    }
+
+    private void clearParallelismOverrides(Context ctx) throws Exception {
+        var parallelismOverrides = stateStore.getParallelismOverrides(ctx);
+        if (parallelismOverrides.isPresent()) {
+            stateStore.removeParallelismOverrides(ctx);
+            stateStore.flush(ctx);
+        }
+    }
+
+    @VisibleForTesting
+    protected Optional<Map<String, String>> getParallelismOverrides(Context ctx) throws Exception {
+        return stateStore.getParallelismOverrides(ctx);
+    }
+
+    /**
+     * If there are any parallelism overrides by the {@link JobAutoScaler} apply them to the
+     * scalingRealizer.
+     *
+     * @param ctx Job context
+     */
+    @VisibleForTesting
+    protected void applyParallelismOverrides(Context ctx) throws Exception {
+        var overridesOpt = getParallelismOverrides(ctx);
+        if (overridesOpt.isEmpty() || overridesOpt.get().isEmpty()) {
+            return;
+        }
+        Map<String, String> overrides = overridesOpt.get();
+        LOG.debug("Applying parallelism overrides: {}", overrides);
+
+        var conf = ctx.getConfiguration();
+        var userOverrides = new HashMap<>(conf.get(PipelineOptions.PARALLELISM_OVERRIDES));
+        var exclusions = conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
+
+        overrides.forEach(
+                (k, v) -> {
+                    // Respect user override for excluded vertices
+                    if (exclusions.contains(k)) {
+                        userOverrides.putIfAbsent(k, v);
+                    } else {
+                        userOverrides.put(k, v);
+                    }
+                });
+        scalingRealizer.realize(ctx, userOverrides);
+    }
+
+    private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetrics)
+            throws Exception {
+
+        var conf = ctx.getConfiguration();
+        if (!conf.getBoolean(AUTOSCALER_ENABLED)) {
+            LOG.debug("Autoscaler is disabled");
+            clearParallelismOverrides(ctx);
+            return;
+        }
+
+        var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+
+        if (collectedMetrics.getMetricHistory().isEmpty()) {
+            stateStore.flush(ctx);
+            return;
+        }
+        LOG.debug("Collected metrics: {}", collectedMetrics);
+
+        var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
+        LOG.debug("Evaluated metrics: {}", evaluatedMetrics);
+        lastEvaluatedMetrics.put(ctx.getJobKey(), evaluatedMetrics);
+
+        initRecommendedParallelism(evaluatedMetrics);
+        autoscalerMetrics.registerScalingMetrics(
+                collectedMetrics.getJobTopology().getVerticesInTopologicalOrder(),
+                () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
+
+        if (!collectedMetrics.isFullyCollected()) {
+            // We have done an upfront evaluation, but we are not ready for scaling.
+            resetRecommendedParallelism(evaluatedMetrics);
+            stateStore.flush(ctx);
+            return;
+        }
+
+        var parallelismChanged = scalingExecutor.scaleResource(ctx, evaluatedMetrics);
+
+        if (parallelismChanged) {
+            autoscalerMetrics.incrementScaling();
+        } else {
+            autoscalerMetrics.incrementBalanced();
+        }
+
+        stateStore.flush(ctx);

Review Comment:
   Can we run the flush after calling this method right after line 85?



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+    @Test
+    public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
+        RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
+                new RestApiMetricsCollector<>();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        Map<String, FlinkMetric> flinkMetrics =
+                Map.of(
+                        "a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+                        "b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+        Map<JobVertexID, Map<String, FlinkMetric>> metrics = Map.of(jobVertexID, flinkMetrics);
+
+        List<AggregatedMetric> aggregatedMetricsResponse =
+                List.of(
+                        new AggregatedMetric(
+                                "a.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "b.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "c.unrelated", Double.NaN, Double.NaN, Double.NaN, 100.));
+
+        Configuration conf = new Configuration();
+        RestClusterClient<String> restClusterClient =
+                new RestClusterClient<>(
+                        conf,
+                        "test-cluster",
+                        (c, e) -> new StandaloneClientHAServices("localhost")) {
+                    @Override
+                    public <
+                                    M extends MessageHeaders<R, P, U>,
+                                    U extends MessageParameters,
+                                    R extends RequestBody,
+                                    P extends ResponseBody>
+                            CompletableFuture<P> sendRequest(
+                                    M messageHeaders, U messageParameters, R request) {
+                        if (messageHeaders instanceof AggregatedSubtaskMetricsHeaders) {
+                            return (CompletableFuture<P>)
+                                    CompletableFuture.completedFuture(
+                                            new AggregatedMetricsResponseBody(
+                                                    aggregatedMetricsResponse));
+                        }
+                        return (CompletableFuture<P>)
+                                CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+                    }
+                };
+
+        JobID jobID = new JobID();
+        JobAutoScalerContext<JobID> context =
+                new JobAutoScalerContext<>(
+                        jobID,
+                        jobID,
+                        conf,
+                        new UnregisteredMetricsGroup(),
+                        () -> restClusterClient);
+
+        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap =
+                collector.queryAllAggregatedMetrics(context, metrics);
+
+        System.out.println(jobVertexIDMapMap);

Review Comment:
   Println left-over?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import lombok.SneakyThrows;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.LoaderOptions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/** The kubernetes autoscaler state store, it's based on the config map. */

Review Comment:
   ```suggestion
   /** An AutoscalerStateStore which persists its state in Kubernetes ConfigMaps. */
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import javax.annotation.Nullable;
+
+/** The kubernetes autoscaler event handler, it's based on the {@link EventRecorder}. */

Review Comment:
   ```suggestion
   /** An event handler which posts events to the Kubernetes events API. */
   ```



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/EventCollector.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The event handler with collect events.
+ *
+ * @param <KEY>
+ * @param <Context>
+ */

Review Comment:
   ```suggestion
   /** The event handler for collecting events */
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +50,36 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    /**
+     * Get or create the autoscaler context.
+     *
+     * @return autoScalerContext.
+     */

Review Comment:
   ```suggestion
   ```



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/FlushCountableAutoscalerStateStore.java:
##########
@@ -15,19 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler.state;
 
-import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
-import lombok.SneakyThrows;
+/** The state store counts the flush. */
+public class FlushCountableAutoscalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>>
+        extends InMemoryAutoScalerStateStore<KEY, Context> {
 
-/** Autoscaler test utilities. * */
-public class AutoscalerTestUtils {
+    private int flushCount = 0;

Review Comment:
   ```suggestion
       private int flushCount;
   ```
   
   Int fields are zero by default.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java:
##########
@@ -17,24 +17,26 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
-import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
-import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
-import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.PipelineOptions;
 
-import com.google.auto.service.AutoService;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import java.util.Map;
+
+/** The kubernetes scaling realizer. */

Review Comment:
   ```suggestion
   /** The Kubernetes implementation for applying parallelism overrides. */
   ```



##########
flink-autoscaler/pom.xml:
##########
@@ -45,6 +45,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done

Review Comment:
   Yep, let's use the version bundled with the client.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
         }
     }
 
+    private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
+        KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext();
+
+        if (autoscalerDisabled(ctx)) {
+            autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false);
+            resourceScaler.scale(autoScalerContext);
+            return;
+        }
+        if (waitingForRunning(ctx.getResource().getStatus())) {
+            LOG.info("Autoscaler is waiting for  stable, running state");
+            resourceScaler.cleanup(autoScalerContext.getJobKey());
+            return;

Review Comment:
   We can internally reset the state, but the effective overrides on the deployment should not be cleared during the waiting period. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1342082710


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
         }
     }
 
+    private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
+        KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext();
+
+        if (autoscalerDisabled(ctx)) {
+            autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false);
+            resourceScaler.scale(autoScalerContext);
+            return;
+        }
+        if (waitingForRunning(ctx.getResource().getStatus())) {
+            LOG.info("Autoscaler is waiting for  stable, running state");
+            resourceScaler.cleanup(autoScalerContext.getJobKey());
+            return;

Review Comment:
   Thanks for pointing it out.
   
   If the `applyParallelismOverrides` is expected here, we can call the related code here. The `applyParallelismOverrides` just calls the `stateStore.getParallelismOverrides(ctx)` and `scalingRealizer.realize(ctx, userOverrides);`, and `stateStore` and `scalingRealizer` can be reached here. 
   
   I plan to extract the `org.apache.flink.autoscaler.JobAutoScalerImpl#applyParallelismOverrides` to a static method, WDYT?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The config map store, it's responsible for store/get/remove the state with String type. */
+public class ConfigMapStore {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);
+
+    private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
+
+    private final KubernetesClient kubernetesClient;
+
+    /**
+     * The cache for each resourceId may be in three situations: 1. The resourceId isn't exist :
+     * ConfigMap isn't loaded from kubernetes, or it's removed. 2. The resourceId is exist, and
+     * value is the Optional.empty() : We have loaded the ConfigMap from kubernetes, but the
+     * ConfigMap isn't created at kubernetes side. 3. The resourceId is exist, and the Optional
+     * isn't empty : We have loaded the ConfigMap from kubernetes, it may be not same with
+     * kubernetes side due to it's not flushed after updating.
+     */
+    private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
+            new ConcurrentHashMap<>();
+
+    public ConfigMapStore(KubernetesClient kubernetesClient) {
+        this.kubernetesClient = kubernetesClient;
+    }
+
+    protected void putSerializedState(
+            KubernetesJobAutoScalerContext jobContext, String key, String value) {
+        getOrCreateState(jobContext).put(key, value);
+    }
+
+    protected Optional<String> getSerializedState(
+            KubernetesJobAutoScalerContext jobContext, String key) {
+        return getConfigMap(jobContext).map(configMap -> configMap.getData().get(key));
+    }
+
+    protected void removeSerializedState(KubernetesJobAutoScalerContext jobContext, String key) {
+        getConfigMap(jobContext)
+                .ifPresentOrElse(
+                        configMap -> configMap.getData().remove(key),
+                        () -> {
+                            throw new IllegalStateException(
+                                    "The configMap isn't created, so the remove is unavailable.");
+                        });
+    }
+
+    public void flush(KubernetesJobAutoScalerContext jobContext) {
+        Optional<ConfigMap> configMapOpt = cache.get(jobContext.getJobKey());
+        Preconditions.checkState(

Review Comment:
   Some callers call the `flush` in the end even if the state isn't updated. So you suggestion is easy to caller.
   
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1343674332


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DefaultJobAutoScalerContext.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+/** The default job autoscaler context, the jobKey is JobID. */
+public class DefaultJobAutoScalerContext extends AbstractJobAutoScalerContext<JobID> {

Review Comment:
   Let's simply delete the class and simply have a non-abstract `JobAutoscalerContext<JobId>`



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AbstractJobAutoScalerContext.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+/**
+ * The abstract job autoscaler context.
+ *
+ * @param <KEY> The job key.
+ */
+public abstract class AbstractJobAutoScalerContext<KEY> implements JobAutoScalerContext<KEY> {

Review Comment:
   I would simply rename this as `JobAutoScalerContext<KEY>`, delete constructor , make all fields simply final and put `@Value` on it, to generate all the getters, toString etc.
   
   We don't need to have interfaces for everything, too much boilerplate



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * The job autoscaler context, it includes all details related to the current job.
+ *
+ * @param <KEY> The job key.
+ */
+@Experimental
+public interface JobAutoScalerContext<KEY> {

Review Comment:
   Let's delete the interface for now (see my previous comment) we can re-add this later if necessary but I doubt it.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/HeapedAutoScalerStateStore.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The state store based on the Java Heap.
+ *
+ * @param <KEY>
+ * @param <Context>
+ */
+public class HeapedAutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>>

Review Comment:
   Let's rename this `HeapedAutoScalerStateStore` -> `InMemoryAutoScalerStateStore`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352333625


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -72,7 +79,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
     protected final EventRecorder eventRecorder;
     protected final StatusRecorder<CR, STATUS> statusRecorder;
-    protected final JobAutoScaler resourceScaler;
+    protected final JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> resourceScaler;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354382276


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    public KubernetesJobAutoScalerContext getJobAutoScalerContext() {
+        if (autoScalerContext != null) {
+            return autoScalerContext;
+        }
+        return autoScalerContext = createJobAutoScalerContext();
+    }
+
+    private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
+        Configuration conf = new Configuration(getObserveConfig());
+        conf.set(
+                AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
+                getOperatorConfig().getFlinkClientTimeout());
+
+        CommonStatus<?> status = getResource().getStatus();
+        String jobId = status.getJobStatus().getJobId();
+
+        JobStatus jobStatus = generateJobStatusEnum(status);
+
+        return new KubernetesJobAutoScalerContext(
+                jobId == null ? null : JobID.fromHexString(jobId),
+                jobStatus,
+                conf,
+                getResourceMetricGroup(),
+                () -> getFlinkService().getClusterClient(conf),
+                resource,
+                getKubernetesClient());
+    }
+
+    @Nullable
+    private JobStatus generateJobStatusEnum(CommonStatus<?> status) {
+        if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
+            return null;
+        }
+
+        String state = status.getJobStatus().getState();
+        if (state == null) {
+            return null;
+        }
+        return JobStatus.valueOf(state);

Review Comment:
   Sure, I didn't change it at this PR, and I created [FLINK-33237](https://issues.apache.org/jira/browse/FLINK-33237) to optimize this after this PR~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355991932


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
                     .stringType()
                     .asList()
                     .defaultValues()
+                    .withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
                     .withDescription(
                             "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+    public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =
+            autoScalerConfig("flink.client.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("The timeout for waiting the flink rest client to return.");

Review Comment:
   Do you mean moving the `KubernetesOperatorConfigOptions#OPERATOR_FLINK_CLIENT_TIMEOUT` to here?
   
   It's a generic option for kubernetes operator, it's not only used for autoscaler, but also used for a lot of `AbstractFlinkService` methods. That's why I didn't move it here.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1336584237


##########
flink-autoscaler/pom.xml:
##########
@@ -45,6 +45,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done

Review Comment:
   The `flink-autoscaler` depends on the `flink-runtime 1.17.1`, and  `flink-runtime 1.17.1` depends on the `flink-shaded-jackson 2.13.4-16.1`.
   
   If `flink-autoscaler` depends on the `flink-shaded-jackson 2.14.2-17.0` directly, the `2.13.4-16.1` and `2.14.2-17.0` may be conflicted, right?
   
   So here has 2 temporary solutions:
   
   - Soluiton1: Using the unified jackson dependence of `flink-kubernetes-operator` (Current PR is it.)
   - Solution2: flink-autoscaler` depends on the `flink-shaded-jackson 2.14.2-17.0` directly, and exclude `flink-shaded-jackson` from `flink-runtime`.
   
   In the short term, all of them are fine for me.
   
   Hi @gyfora @mxm , you are the expert of `flink-kubernetes-operator`, could the whole `flink-kubernetes-operator` use the `flink-shaded-jackson 2.14.2-17.0`? 
   
   If yes, this problem is easy to solve.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1752727711

   > If it's still needed, I can restore the `flink-kubernetes-operator-autoscaler` module. And moving all kubernetes-autoscaler related classes to this module. WDYT?
   
   Given that the autoscaler module is now decoupled from the operator version, it would make more sense than ever to have it pluggable. But given that the backends are not pluggable, the pluggability wouldn't be as useful anymore. I'm ok with removing the support for now if it proves difficult to maintain it.
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1336584237


##########
flink-autoscaler/pom.xml:
##########
@@ -45,6 +45,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done

Review Comment:
   Thanks for your reminder, I will update it in the subsequent commit.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * The job autoscaler context, it includes all details related to the current job.
+ *
+ * @param <KEY> The job key.
+ */
+@Experimental
+public interface JobAutoScalerContext<KEY> {
+
+    /** The identifier of each flink job. */
+    KEY getJobKey();
+
+    JobID getJobID();
+
+    Configuration getConfiguration();
+
+    MetricGroup getMetricGroup();
+
+    RestClusterClient<String> getRestClusterClient() throws Exception;

Review Comment:
   >  I am wondering why we're relying on `RestClusterClient` here? Shouldn't we use the `ClusterClient` interface, instead, to keep it generic and improve testability?
   
   After I check, `ClusterClient` has only some commonly used interfaces, such as: `submitJob`, `getJobStatus`, `cancelJob` and a series of savepoint releated interfaces, etc.
   
   Autoscaler is using a couple of interfaces that request rest api, such as: 
   
   - [getJobTopology](https://github.com/apache/flink-kubernetes-operator/blob/662fa612a8ab352e43ab8a99fa61aadfbe41e4d7/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java#L106) 
   - [getJobDetailsInfo](https://github.com/apache/flink-kubernetes-operator/blob/662fa612a8ab352e43ab8a99fa61aadfbe41e4d7/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java#L97)
   - [Query generic metrics ](https://github.com/apache/flink-kubernetes-operator/blob/662fa612a8ab352e43ab8a99fa61aadfbe41e4d7/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java#L433)
   
   Also, the new `rescale rest api (FLIP-291)`[1] will be introduced in this FLIP. That's why we using the `RestClusterClient` here. 
   
   These information cannot be fetched by `ClusterClient`, right?
   
   Please correct me if I misunderstand here, thanks a lot.
   
   [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352290023


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
         }
     }
 
+    private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
+        KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext();
+
+        if (autoscalerDisabled(ctx)) {
+            autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false);
+            resourceScaler.scale(autoScalerContext);
+            return;
+        }
+        if (waitingForRunning(ctx.getResource().getStatus())) {
+            LOG.info("Autoscaler is waiting for  stable, running state");
+            resourceScaler.cleanup(autoScalerContext.getJobKey());
+            return;

Review Comment:
   Preferably, I would like any logic related to applying parallelism inside the autoscaler implementation. This shouldn't change when the autoscaler is waiting for the running state. In fact, the job state checks should also be performed by the autoscaler, not by the reconciler. The current code mixes control over the parallelism overrides between the reconciler and the autoscaler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354557198


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * The job autoscaler context, it includes all details related to the current job.
+ *
+ * @param <KEY> The job key.
+ */
+@Experimental
+public interface JobAutoScalerContext<KEY> {
+
+    /** The identifier of each flink job. */
+    KEY getJobKey();
+
+    JobID getJobID();
+
+    Configuration getConfiguration();
+
+    MetricGroup getMetricGroup();
+
+    RestClusterClient<String> getRestClusterClient() throws Exception;

Review Comment:
   Sorry for responding with a delay due to vacation and other things. I guess, you're right. The `ClusterClient` interface doesn't cover all the methods that are provided by the `RestClusterClient`. It feels like we would have to revisit the `ClusterClient` to align things again. One workaround would be to provide a wrapper interface that only reveals the methods that are need by the Kubernetes Operator. WDYT?
   
   My concern is that using a concrete implementation might make testing things harder.
   
   Btw. have we thought of relying on the OpenAPI definition to generate a REST client?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mateczagany (via GitHub)" <gi...@apache.org>.
mateczagany commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1359305741


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import javax.annotation.Nullable;
+
+/** An implementation of JobAutoscalerContext for Kubernetes. */
+public class KubernetesJobAutoScalerContext extends JobAutoScalerContext<ResourceID> {
+
+    private final AbstractFlinkResource<?, ?> resource;
+
+    private final KubernetesClient kubernetesClient;
+
+    public KubernetesJobAutoScalerContext(
+            JobID jobID,

Review Comment:
   jobID should also be `@Nullable` 



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##########
@@ -441,15 +442,21 @@ protected Collection<AggregatedMetric> queryAggregatedMetricNames(
 
     protected abstract Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>>
             queryAllAggregatedMetrics(
-                    AbstractFlinkResource<?, ?> cr,
-                    FlinkService flinkService,
-                    Configuration conf,
+                    Context ctx,
                     Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames);
 
-    public void cleanup(AbstractFlinkResource<?, ?> cr) {
-        var resourceId = ResourceID.fromResource(cr);
-        histories.remove(resourceId);
-        availableVertexMetricNames.remove(resourceId);
+    public JobDetailsInfo getJobDetailsInfo(
+            JobAutoScalerContext<KEY> context, Duration clientTimeout) throws Exception {

Review Comment:
   I don't expect it to be fixed in this PR, just want to note that this is an abstract class with `RestApiMetricsCollector` extending it. This makes it seem like it would be possible to extend this class to be able to collect metrics via methods other than REST API.
   
   This would already be a challenge as there are several REST API calls already in this class, but with this PR, we will also tie this class to REST API with `JobAutoScalerContext`.
   
   Perhaps it makes sense to remove `RestApiMetricsCollector` at this point?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361898608


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    public KubernetesJobAutoScalerContext getJobAutoScalerContext() {
+        if (autoScalerContext != null) {
+            return autoScalerContext;
+        }
+        return autoScalerContext = createJobAutoScalerContext();
+    }
+
+    private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
+        Configuration conf = new Configuration(getObserveConfig());
+        conf.set(
+                AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
+                getOperatorConfig().getFlinkClientTimeout());

Review Comment:
   @mxm  also mentioned this, I have answered at this comment[1].
   
   In brief, operator has client timeout config, however:
   - `autoscaler` module cannot depend on operator code.
   - The client timeout config of operator is used in many places(not only autoscaler), so move it into `autoscaler` module is strange.
   
   [1] https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350028811



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361898608


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    public KubernetesJobAutoScalerContext getJobAutoScalerContext() {
+        if (autoScalerContext != null) {
+            return autoScalerContext;
+        }
+        return autoScalerContext = createJobAutoScalerContext();
+    }
+
+    private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
+        Configuration conf = new Configuration(getObserveConfig());
+        conf.set(
+                AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
+                getOperatorConfig().getFlinkClientTimeout());

Review Comment:
   @mxm  also mentioned this, I have answered at this comment[1].
   
   In brief, operator has client timeout config, however:
   - `autoscaler` module cannot depend on operator code.
   - The client timeout config of operator is used in many places(not only autoscaler), if we move it into `autoscaler` module, it will be strange when other places use the autoscaler client timeout.
   
   [1] https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350028811



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1333849523


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+/**
+ * The state store is responsible for store all states during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    void storeScalingHistory(Context jobContext, String scalingHistory);

Review Comment:
   Hi @Samrat002 , thanks for your feedback!
   
   I have updated these method parameters of `AutoScalerStateStore` to the specific class instead of String, such as: `Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory`.
   
   ```
   public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {
   
       void storeScalingHistory(
               Context jobContext,
               Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory);
   
       Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> getScalingHistory(
               Context jobContext);
   
       void removeScalingHistory(Context jobContext);
   }
   ```
   
   The PR has been updated as well.
   
   Do you think is it ok? It means the state store is responsible for how to serialize and deserialize, for example:
   
   - The default `KubernetesAutoScalerStateStore` will serialize all states to String inside of `KubernetesAutoScalerStateStore`
   - As you mentioned before: if there is any complex type in the future. Each state store to determine how to serialize them.
   
   Also, let me add a reason why update these parameters here:
   
   Currently, all states are stored at ConfigMap, and it has size limitation. The size limitation should just work with `KubernetesAutoScalerStateStore`, and size limitation is a part of serialization. So we should move the serialization and deserialization in the `AutoScalerStateStore`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1752467497

   > The refactor also completely changes how the autoscaler module is loaded in the operator. Previously we intentionally loaded this as a service so it would be possible to potentially upgrade the autoscaler without the rest of the operator.
   > 
   > Now @1996fanrui you moved the operator related autoscaler classes in the main operator module resulting in a tight coupling.
   > 
   > @mxm how do you feel about this? Originally you were the one who introduced the service loader approach :)
   
   Sorry for breaking this due to I don't know the background before. And thanks to @gyfora for pointing it out.
   
   If it's still needed, I can restore the `flink-kubernetes-operator-autoscaler` module. And moving all kubernetes-autoscaler related classes to this module. WDYT?
   
   cc @mxm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354562864


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * The job autoscaler context, it includes all details related to the current job.
+ *
+ * @param <KEY> The job key.
+ */
+@Experimental
+public interface JobAutoScalerContext<KEY> {
+
+    /** The identifier of each flink job. */
+    KEY getJobKey();
+
+    JobID getJobID();
+
+    Configuration getConfiguration();
+
+    MetricGroup getMetricGroup();
+
+    RestClusterClient<String> getRestClusterClient() throws Exception;

Review Comment:
   We intentionally avoided too many wrappers, generated clients etc whenever possible. Testing is not too bad and we would like to avoid the interface/factory hell that is currently present in core Flink :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355175979


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    /**
+     * Handle the event.
+     *
+     * @param interval When interval is great than 0, events that repeat within the interval will be
+     *     ignored.
+     */
+    void handleEvent(
+            Context context,
+            Type type,
+            String reason,
+            String message,
+            @Nullable String messageKey,
+            @Nullable Duration interval);
+
+    /** The type of the events. */
+    enum Type {
+        Normal,
+        Error

Review Comment:
   Why not Normal, _Warning_, Error?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements JobAutoScaler<KEY, Context> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError";
+
+    private final ScalingMetricCollector<KEY, Context> metricsCollector;
+    private final ScalingMetricEvaluator evaluator;
+    private final ScalingExecutor<KEY, Context> scalingExecutor;
+    private final AutoScalerEventHandler<KEY, Context> eventHandler;
+    private final ScalingRealizer<KEY, Context> scalingRealizer;
+    private final AutoScalerStateStore<KEY, Context> stateStore;
+
+    @VisibleForTesting
+    final Map<KEY, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>
+            lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
+    final Map<KEY, AutoscalerFlinkMetrics> flinkMetrics = new ConcurrentHashMap<>();
+
+    public JobAutoScalerImpl(
+            ScalingMetricCollector<KEY, Context> metricsCollector,
+            ScalingMetricEvaluator evaluator,
+            ScalingExecutor<KEY, Context> scalingExecutor,
+            AutoScalerEventHandler<KEY, Context> eventHandler,
+            ScalingRealizer<KEY, Context> scalingRealizer,
+            AutoScalerStateStore<KEY, Context> stateStore) {
+        this.metricsCollector = metricsCollector;
+        this.evaluator = evaluator;
+        this.scalingExecutor = scalingExecutor;
+        this.eventHandler = eventHandler;
+        this.scalingRealizer = scalingRealizer;
+        this.stateStore = stateStore;
+    }
+
+    @Override
+    public void scale(Context ctx) throws Exception {
+        var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx);
+
+        try {
+            if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) {
+                LOG.debug("Autoscaler is disabled");
+                clearParallelismOverrides(ctx);
+                return;
+            }
+
+            if (ctx.getJobStatus() != JobStatus.RUNNING) {
+                lastEvaluatedMetrics.remove(ctx.getJobKey());
+                return;
+            }
+
+            runScalingLogic(ctx, autoscalerMetrics);
+            stateStore.flush(ctx);

Review Comment:
   This is not flushing anymore after line 88 and line 93 but it was before.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.metrics;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nonnull;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** The utils for scaling history. */
+public class ScalingHistoryUtils {
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+        addToScalingHistoryAndStore(
+                stateStore,
+                context,
+                getTrimmedScalingHistory(stateStore, context, now),
+                now,
+                summaries);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+
+        summaries.forEach(
+                (id, summary) ->
+                        scalingHistory.computeIfAbsent(id, j -> new TreeMap<>()).put(now, summary));
+        stateStore.storeScalingHistory(context, scalingHistory);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void updateVertexList(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context ctx,
+            Instant now,
+            Set<JobVertexID> vertexSet)
+            throws Exception {
+        Map<JobVertexID, SortedMap<Instant, ScalingSummary>> trimmedScalingHistory =
+                getTrimmedScalingHistory(stateStore, ctx, now);
+
+        if (trimmedScalingHistory.keySet().removeIf(v -> !vertexSet.contains(v))) {
+            stateStore.storeScalingHistory(ctx, trimmedScalingHistory);
+        }
+    }
+
+    @Nonnull
+    public static <KEY, Context extends JobAutoScalerContext<KEY>>
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getTrimmedScalingHistory(
+                    AutoScalerStateStore<KEY, Context> autoScalerStateStore,
+                    Context context,
+                    Instant now)
+                    throws Exception {
+        var conf = context.getConfiguration();
+        return autoScalerStateStore
+                .getScalingHistory(context)
+                .map(
+                        scalingHistory -> {
+                            var entryIt = scalingHistory.entrySet().iterator();
+                            while (entryIt.hasNext()) {
+                                var entry = entryIt.next();
+                                // Limit how long past scaling decisions are remembered
+                                entry.setValue(
+                                        entry.getValue()
+                                                .tailMap(
+                                                        now.minus(
+                                                                conf.get(
+                                                                        AutoScalerOptions
+                                                                                .VERTEX_SCALING_HISTORY_AGE))));

Review Comment:
   I see! It's hard to tell what has been moved and what not. I was concerned that returning a submap here can cause issues. It's better to clear the part from the map we do not need anymore and return the full map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] XComp commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1336571925


##########
flink-autoscaler/pom.xml:
##########
@@ -45,6 +45,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done

Review Comment:
   The shaded Jackson dependencies for 2.14.2 are already published (see [org.apache.flink:flink-shaded-jackson:2.14.2-17.0](https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-jackson/2.14.2-17.0)). You should be able to use it even without waiting for Flink 1.18 to be released. ...or am I missing something here? :thinking: 



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * The job autoscaler context, it includes all details related to the current job.
+ *
+ * @param <KEY> The job key.
+ */
+@Experimental
+public interface JobAutoScalerContext<KEY> {
+
+    /** The identifier of each flink job. */
+    KEY getJobKey();
+
+    JobID getJobID();
+
+    Configuration getConfiguration();
+
+    MetricGroup getMetricGroup();
+
+    RestClusterClient<String> getRestClusterClient() throws Exception;

Review Comment:
   Sorry for jumping in out of nowhere. But I started looking into the FLIP-334. I should have participated in the discussion earlier but missed it. Anyway, I am wondering why we're relying on `RestClusterClient` here? Shouldn't we use the `ClusterClient` interface, instead, to keep it generic and improve testability?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361902603


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    public KubernetesJobAutoScalerContext getJobAutoScalerContext() {
+        if (autoScalerContext != null) {
+            return autoScalerContext;
+        }
+        return autoScalerContext = createJobAutoScalerContext();
+    }
+
+    private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
+        Configuration conf = new Configuration(getObserveConfig());
+        conf.set(
+                AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
+                getOperatorConfig().getFlinkClientTimeout());

Review Comment:
   Ah sorry, now I see. Let's keep this for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

Posted by "mateczagany (via GitHub)" <gi...@apache.org>.
mateczagany commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1341955135


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
         }
     }
 
+    private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
+        KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext();
+
+        if (autoscalerDisabled(ctx)) {
+            autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false);
+            resourceScaler.scale(autoScalerContext);
+            return;
+        }
+        if (waitingForRunning(ctx.getResource().getStatus())) {
+            LOG.info("Autoscaler is waiting for  stable, running state");
+            resourceScaler.cleanup(autoScalerContext.getJobKey());
+            return;

Review Comment:
   Previously the logic was to still apply the parallelism overrides to the resource even though the resource is not running, but that's not the case here anymore. 
   
   This will cause the in-place scaling to fail because during the next reconciliation the spec will not contain the parallelism overrides as the job is not in a running state.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The config map store, it's responsible for store/get/remove the state with String type. */
+public class ConfigMapStore {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);
+
+    private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
+
+    private final KubernetesClient kubernetesClient;
+
+    /**
+     * The cache for each resourceId may be in three situations: 1. The resourceId isn't exist :
+     * ConfigMap isn't loaded from kubernetes, or it's removed. 2. The resourceId is exist, and
+     * value is the Optional.empty() : We have loaded the ConfigMap from kubernetes, but the
+     * ConfigMap isn't created at kubernetes side. 3. The resourceId is exist, and the Optional
+     * isn't empty : We have loaded the ConfigMap from kubernetes, it may be not same with
+     * kubernetes side due to it's not flushed after updating.
+     */
+    private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
+            new ConcurrentHashMap<>();
+
+    public ConfigMapStore(KubernetesClient kubernetesClient) {
+        this.kubernetesClient = kubernetesClient;
+    }
+
+    protected void putSerializedState(
+            KubernetesJobAutoScalerContext jobContext, String key, String value) {
+        getOrCreateState(jobContext).put(key, value);
+    }
+
+    protected Optional<String> getSerializedState(
+            KubernetesJobAutoScalerContext jobContext, String key) {
+        return getConfigMap(jobContext).map(configMap -> configMap.getData().get(key));
+    }
+
+    protected void removeSerializedState(KubernetesJobAutoScalerContext jobContext, String key) {
+        getConfigMap(jobContext)
+                .ifPresentOrElse(
+                        configMap -> configMap.getData().remove(key),
+                        () -> {
+                            throw new IllegalStateException(
+                                    "The configMap isn't created, so the remove is unavailable.");
+                        });
+    }
+
+    public void flush(KubernetesJobAutoScalerContext jobContext) {
+        Optional<ConfigMap> configMapOpt = cache.get(jobContext.getJobKey());
+        Preconditions.checkState(

Review Comment:
   This will fail during the autoscale stabilization period as we won't collect any metrics there. I think we could just simply log here and not throw an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Samrat002 commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

Posted by "Samrat002 (via GitHub)" <gi...@apache.org>.
Samrat002 commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1333545703


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+/**
+ * The state store is responsible for store all states during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    void storeScalingHistory(Context jobContext, String scalingHistory);

Review Comment:
   
   Much appreciated, @1996fanrui , for summarizing our discussion.
   
   I would like to add one more point: the `AutoScalerEventHandler` currently offers two types, namely `warning` and `normal`. In my opinion, it would be beneficial to include an `Error` or `Fatal` option as part of the interface. This flexibility would allow different users implementing this autoscaler module to define and use these options according to their specific requirements.
   
   I would definitely appreciate hearing the thoughts of @mxm  and @gyfora  regarding the adoption of structured classes over string and the proposal to introduce a new type called Fatal in `AutoScalerEventHandler`, as described in this thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1342079785


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The config map store, it's responsible for store/get/remove the state with String type. */
+public class ConfigMapStore {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);
+
+    private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
+
+    private final KubernetesClient kubernetesClient;
+
+    /**
+     * The cache for each resourceId may be in three situations: 1. The resourceId isn't exist :
+     * ConfigMap isn't loaded from kubernetes, or it's removed. 2. The resourceId is exist, and
+     * value is the Optional.empty() : We have loaded the ConfigMap from kubernetes, but the
+     * ConfigMap isn't created at kubernetes side. 3. The resourceId is exist, and the Optional
+     * isn't empty : We have loaded the ConfigMap from kubernetes, it may be not same with
+     * kubernetes side due to it's not flushed after updating.
+     */
+    private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
+            new ConcurrentHashMap<>();
+
+    public ConfigMapStore(KubernetesClient kubernetesClient) {
+        this.kubernetesClient = kubernetesClient;
+    }
+
+    protected void putSerializedState(
+            KubernetesJobAutoScalerContext jobContext, String key, String value) {
+        getOrCreateState(jobContext).put(key, value);
+    }
+
+    protected Optional<String> getSerializedState(
+            KubernetesJobAutoScalerContext jobContext, String key) {
+        return getConfigMap(jobContext).map(configMap -> configMap.getData().get(key));
+    }
+
+    protected void removeSerializedState(KubernetesJobAutoScalerContext jobContext, String key) {
+        getConfigMap(jobContext)
+                .ifPresentOrElse(
+                        configMap -> configMap.getData().remove(key),
+                        () -> {
+                            throw new IllegalStateException(
+                                    "The configMap isn't created, so the remove is unavailable.");
+                        });
+    }
+
+    public void flush(KubernetesJobAutoScalerContext jobContext) {
+        Optional<ConfigMap> configMapOpt = cache.get(jobContext.getJobKey());
+        Preconditions.checkState(

Review Comment:
   Some callers call the `flush` in the end even if the state isn't updated. So you suggestion is easy to caller.
   
   Updated the prod code, and I will add more tests for it later due to I'm on vacation, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356512684


##########
flink-autoscaler/pom.xml:
##########
@@ -45,6 +45,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done

Review Comment:
   There's gonna be another release candidate (RC2) by the end of the week according to the release managers. I hope that 1.18.0 will be released within October. 
   
   Using the shaded jackson dependency from Flink (in contrast to shading the dependency within the Kubernetes operator) comes with the benefit that you don't have to maintain a dedicated entry in the NOTICE file, as far as I understand. So, it has "some advantage".
   
   Anyway, the approach can be changed later on if it becomes a problem. So, feel free to follow what you agreed on in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356620090


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    /**
+     * Handle the event.
+     *
+     * @param interval When interval is great than 0, events that repeat within the interval will be
+     *     ignored.
+     */
+    void handleEvent(
+            Context context,
+            Type type,
+            String reason,
+            String message,
+            @Nullable String messageKey,
+            @Nullable Duration interval);
+
+    /** The type of the events. */
+    enum Type {
+        Normal,
+        Error

Review Comment:
   Thanks for the feedback, I have reverted the `Error` to `Warning` by a force push.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354332455


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
         }
     }
 
+    private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
+        KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext();
+
+        if (autoscalerDisabled(ctx)) {
+            autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false);
+            resourceScaler.scale(autoScalerContext);
+            return;
+        }
+        if (waitingForRunning(ctx.getResource().getStatus())) {
+            LOG.info("Autoscaler is waiting for  stable, running state");
+            resourceScaler.cleanup(autoScalerContext.getJobKey());
+            return;

Review Comment:
   Thanks to @mxm for the suggestion!
   
   I have moved the `waitingForRunning` logic back to autoscaler module, and related tests have been recovered.
   
   Also, these 2 comments[1][2] have been recovered as well.
   
   [1] https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350082356
   [2] https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350079573



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mateczagany (via GitHub)" <gi...@apache.org>.
mateczagany commented on PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1766010488

   I've also manually tested this change on Flink 1.18 (both reactive and non-reactive), and worked flawlessly. 
   Thank you for the effort you've put into this, I really like the direction the autoscaler is heading.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361881666


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    public KubernetesJobAutoScalerContext getJobAutoScalerContext() {
+        if (autoScalerContext != null) {
+            return autoScalerContext;
+        }
+        return autoScalerContext = createJobAutoScalerContext();
+    }
+
+    private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
+        Configuration conf = new Configuration(getObserveConfig());
+        conf.set(
+                AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
+                getOperatorConfig().getFlinkClientTimeout());

Review Comment:
   Is this really necessary? seems costly, and we already have client timeout config in the operator itself which is already applied



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/ScalingRealizerCollector.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.realizer;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import java.util.LinkedList;
+import java.util.Map;
+
+/** The event handler for collecting scaling events. */
+public class ScalingRealizerCollector<KEY, Context extends JobAutoScalerContext<KEY>>

Review Comment:
   Could we call this `TestingScalingRealizer` that way the intention is clear.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/FlushCountableAutoscalerStateStore.java:
##########
@@ -15,19 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler.state;
 
-import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
-import lombok.SneakyThrows;
+/** The state store counts the flush. */
+public class FlushCountableAutoscalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>>

Review Comment:
   Could we call this `TestingAutoscalerStateStore` that way the intention is clear.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/EventCollector.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import lombok.Getter;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The event handler for collecting events. */
+public class EventCollector<KEY, Context extends JobAutoScalerContext<KEY>>

Review Comment:
   Could we call this `TestingEventCollector` that way the intention is clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1752501151

   @1996fanrui Sorry I merged a PR related to event triggering from the ScalingExecutor. I will try to avoid merging further autoscaler related PRs until this is finalised. Please take care when rebasing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora merged PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354340358


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    public KubernetesJobAutoScalerContext getJobAutoScalerContext() {
+        if (autoScalerContext != null) {
+            return autoScalerContext;
+        }
+        return autoScalerContext = createJobAutoScalerContext();
+    }
+
+    private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
+        Configuration conf = new Configuration(getObserveConfig());
+        conf.set(
+                AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
+                getOperatorConfig().getFlinkClientTimeout());
+
+        CommonStatus<?> status = getResource().getStatus();
+        String jobId = status.getJobStatus().getJobId();
+
+        JobStatus jobStatus = generateJobStatusEnum(status);
+
+        return new KubernetesJobAutoScalerContext(
+                jobId == null ? null : JobID.fromHexString(jobId),
+                jobStatus,
+                conf,
+                getResourceMetricGroup(),
+                () -> getFlinkService().getClusterClient(conf),
+                resource,
+                getKubernetesClient());
+    }
+
+    @Nullable
+    private JobStatus generateJobStatusEnum(CommonStatus<?> status) {
+        if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
+            return null;
+        }
+
+        String state = status.getJobStatus().getState();
+        if (state == null) {
+            return null;
+        }
+        return JobStatus.valueOf(state);

Review Comment:
   Hi @mxm  @gyfora , could the type of `org.apache.flink.kubernetes.operator.api.status.JobStatus#state` be changed from `String` to `org.apache.flink.api.common.JobStatus`?
   
   I see all setters respect it. If it can be changed, the `JobStatus.valueOf(state)` isn't necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "mxm (via GitHub)" <gi...@apache.org>.
mxm commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354624451


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    public KubernetesJobAutoScalerContext getJobAutoScalerContext() {
+        if (autoScalerContext != null) {
+            return autoScalerContext;
+        }
+        return autoScalerContext = createJobAutoScalerContext();
+    }
+
+    private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
+        Configuration conf = new Configuration(getObserveConfig());
+        conf.set(
+                AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
+                getOperatorConfig().getFlinkClientTimeout());
+
+        CommonStatus<?> status = getResource().getStatus();
+        String jobId = status.getJobStatus().getJobId();
+
+        JobStatus jobStatus = generateJobStatusEnum(status);
+
+        return new KubernetesJobAutoScalerContext(
+                jobId == null ? null : JobID.fromHexString(jobId),
+                jobStatus,
+                conf,
+                getResourceMetricGroup(),
+                () -> getFlinkService().getClusterClient(conf),
+                resource,
+                getKubernetesClient());
+    }
+
+    @Nullable
+    private JobStatus generateJobStatusEnum(CommonStatus<?> status) {
+        if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
+            return null;
+        }
+
+        String state = status.getJobStatus().getState();
+        if (state == null) {
+            return null;
+        }
+        return JobStatus.valueOf(state);

Review Comment:
   Sure, if that does not cause any problems with the CRD state, that is fine with me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1336584237


##########
flink-autoscaler/pom.xml:
##########
@@ -45,6 +45,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done

Review Comment:
   The `flink-autoscaler` depends on the `flink-runtime 1.17.1`, and  `flink-runtime 1.17.1` depends on the `flink-shaded-jackson 2.13.4-16.1`.
   
   If `flink-autoscaler` depends on the `flink-shaded-jackson 2.14.2-17.0` directly, the `2.13.4-16.1` and `2.14.2-17.0` may be conflicted, right?
   
   So here has 2 temporary solutions:
   
   - Soluiton1: Using the unified jackson dependence of `flink-kubernetes-operator` (Current PR is it.)
   - Solution2: `flink-autoscaler` depends on the `flink-shaded-jackson 2.14.2-17.0` directly, and exclude `flink-shaded-jackson` from `flink-runtime`.
   
   In the short term, all of them are fine for me.
   
   Hi @gyfora @mxm , you are the expert of `flink-kubernetes-operator`, could the whole `flink-kubernetes-operator` use the `flink-shaded-jackson 2.14.2-17.0`? 
   
   If yes, this problem is easy to solve.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1766110914

   > I've also manually tested this change on Flink 1.18 (both reactive and non-reactive), and worked flawlessly. Thank you for the effort you've put into this, I really like the direction the autoscaler is heading.
   
   Thanks @mateczagany  for the test!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355987715


##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+    @Test
+    public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
+        RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
+                new RestApiMetricsCollector<>();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        Map<String, FlinkMetric> flinkMetrics =
+                Map.of(
+                        "a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+                        "b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+        Map<JobVertexID, Map<String, FlinkMetric>> metrics = Map.of(jobVertexID, flinkMetrics);
+
+        List<AggregatedMetric> aggregatedMetricsResponse =
+                List.of(
+                        new AggregatedMetric(
+                                "a.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "b.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "c.unrelated", Double.NaN, Double.NaN, Double.NaN, 100.));
+
+        Configuration conf = new Configuration();
+        RestClusterClient<String> restClusterClient =
+                new RestClusterClient<>(
+                        conf,
+                        "test-cluster",
+                        (c, e) -> new StandaloneClientHAServices("localhost")) {
+                    @Override
+                    public <
+                                    M extends MessageHeaders<R, P, U>,
+                                    U extends MessageParameters,
+                                    R extends RequestBody,
+                                    P extends ResponseBody>
+                            CompletableFuture<P> sendRequest(
+                                    M messageHeaders, U messageParameters, R request) {
+                        if (messageHeaders instanceof AggregatedSubtaskMetricsHeaders) {
+                            return (CompletableFuture<P>)
+                                    CompletableFuture.completedFuture(
+                                            new AggregatedMetricsResponseBody(
+                                                    aggregatedMetricsResponse));
+                        }
+                        return (CompletableFuture<P>)
+                                CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+                    }
+                };
+
+        JobID jobID = new JobID();
+        JobAutoScalerContext<JobID> context =
+                new JobAutoScalerContext<>(
+                        jobID,
+                        jobID,
+                        conf,
+                        new UnregisteredMetricsGroup(),
+                        () -> restClusterClient);
+
+        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap =
+                collector.queryAllAggregatedMetrics(context, metrics);
+
+        System.out.println(jobVertexIDMapMap);

Review Comment:
   I see, removed.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    /**
+     * Handle the event.
+     *
+     * @param interval When interval is great than 0, events that repeat within the interval will be
+     *     ignored.
+     */
+    void handleEvent(
+            Context context,
+            Type type,
+            String reason,
+            String message,
+            @Nullable String messageKey,
+            @Nullable Duration interval);
+
+    /** The type of the events. */
+    enum Type {
+        Normal,
+        Error

Review Comment:
   Sorry, I misunderstand your previous comment.
   
   I think 2 types is enough so far:
   - All events just use the Normal and Error
   - `EventRecorder.Type` just has 2 types, if we have 3 types here, the kubernetes event handler will convert 3 types to EventRecorder's 2 types.
   
   I prefer to add the third type in the future if needed. Although this class is public, adding enumerations doesn't cause any compatibility issues. So it should be very easy to add types.
   
   I don't have a strong opinion here. I will determine keep 2 or 3 types after your feedback.
   
   Also, if you think 2 types is fine in this PR, I prefer to keep `Normal` and `Warning`.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.metrics;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nonnull;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** The utils for scaling history. */
+public class ScalingHistoryUtils {
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+        addToScalingHistoryAndStore(
+                stateStore,
+                context,
+                getTrimmedScalingHistory(stateStore, context, now),
+                now,
+                summaries);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+
+        summaries.forEach(
+                (id, summary) ->
+                        scalingHistory.computeIfAbsent(id, j -> new TreeMap<>()).put(now, summary));
+        stateStore.storeScalingHistory(context, scalingHistory);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void updateVertexList(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context ctx,
+            Instant now,
+            Set<JobVertexID> vertexSet)
+            throws Exception {
+        Map<JobVertexID, SortedMap<Instant, ScalingSummary>> trimmedScalingHistory =
+                getTrimmedScalingHistory(stateStore, ctx, now);
+
+        if (trimmedScalingHistory.keySet().removeIf(v -> !vertexSet.contains(v))) {
+            stateStore.storeScalingHistory(ctx, trimmedScalingHistory);
+        }
+    }
+
+    @Nonnull
+    public static <KEY, Context extends JobAutoScalerContext<KEY>>
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getTrimmedScalingHistory(
+                    AutoScalerStateStore<KEY, Context> autoScalerStateStore,
+                    Context context,
+                    Instant now)
+                    throws Exception {
+        var conf = context.getConfiguration();
+        return autoScalerStateStore
+                .getScalingHistory(context)
+                .map(
+                        scalingHistory -> {
+                            var entryIt = scalingHistory.entrySet().iterator();
+                            while (entryIt.hasNext()) {
+                                var entry = entryIt.next();
+                                // Limit how long past scaling decisions are remembered
+                                entry.setValue(
+                                        entry.getValue()
+                                                .tailMap(
+                                                        now.minus(
+                                                                conf.get(
+                                                                        AutoScalerOptions
+                                                                                .VERTEX_SCALING_HISTORY_AGE))));

Review Comment:
   Thanks for the clarification. Updated it by new a TreeMap.
   
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
                     .stringType()
                     .asList()
                     .defaultValues()
+                    .withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
                     .withDescription(
                             "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+    public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =
+            autoScalerConfig("flink.client.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("The timeout for waiting the flink rest client to return.");

Review Comment:
   Do you mean moving the `KubernetesOperatorConfigOptions#OPERATOR_FLINK_CLIENT_TIMEOUT` here?
   
   It's a generic option for kubernetes operator, it's not only used for autoscaler, but also used for a lot of `AbstractFlinkService` methods. That's why I didn't move it here.
   
   WDYT?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements JobAutoScaler<KEY, Context> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError";
+
+    private final ScalingMetricCollector<KEY, Context> metricsCollector;
+    private final ScalingMetricEvaluator evaluator;
+    private final ScalingExecutor<KEY, Context> scalingExecutor;
+    private final AutoScalerEventHandler<KEY, Context> eventHandler;
+    private final ScalingRealizer<KEY, Context> scalingRealizer;
+    private final AutoScalerStateStore<KEY, Context> stateStore;
+
+    @VisibleForTesting
+    final Map<KEY, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>
+            lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
+    final Map<KEY, AutoscalerFlinkMetrics> flinkMetrics = new ConcurrentHashMap<>();
+
+    public JobAutoScalerImpl(
+            ScalingMetricCollector<KEY, Context> metricsCollector,
+            ScalingMetricEvaluator evaluator,
+            ScalingExecutor<KEY, Context> scalingExecutor,
+            AutoScalerEventHandler<KEY, Context> eventHandler,
+            ScalingRealizer<KEY, Context> scalingRealizer,
+            AutoScalerStateStore<KEY, Context> stateStore) {
+        this.metricsCollector = metricsCollector;
+        this.evaluator = evaluator;
+        this.scalingExecutor = scalingExecutor;
+        this.eventHandler = eventHandler;
+        this.scalingRealizer = scalingRealizer;
+        this.stateStore = stateStore;
+    }
+
+    @Override
+    public void scale(Context ctx) throws Exception {
+        var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx);
+
+        try {
+            if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) {
+                LOG.debug("Autoscaler is disabled");
+                clearParallelismOverrides(ctx);
+                return;
+            }
+
+            if (ctx.getJobStatus() != JobStatus.RUNNING) {
+                lastEvaluatedMetrics.remove(ctx.getJobKey());
+                return;
+            }
+
+            runScalingLogic(ctx, autoscalerMetrics);
+            stateStore.flush(ctx);

Review Comment:
   I see these 2 `if condition` doesn't call the `autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());` in master branch, and the subsequent code call it.
   
   That's why I extracted these 2 conditions from the `runScalingLogic` method. Please correct me if I'm wrong, thanks!
   
   
   [1] https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java#L168
   [2] https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java#L176



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360032639


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import javax.annotation.Nullable;
+
+/** An implementation of JobAutoscalerContext for Kubernetes. */
+public class KubernetesJobAutoScalerContext extends JobAutoScalerContext<ResourceID> {
+
+    private final AbstractFlinkResource<?, ?> resource;
+
+    private final KubernetesClient kubernetesClient;
+
+    public KubernetesJobAutoScalerContext(
+            JobID jobID,

Review Comment:
   Thanks for pointing it out, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361907227


##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/EventCollector.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import lombok.Getter;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The event handler for collecting events. */
+public class EventCollector<KEY, Context extends JobAutoScalerContext<KEY>>

Review Comment:
   Thanks for these suggestions, these 3 classes are renamed at the last commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org