You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/26 04:25:02 UTC

[GitHub] [flink-kubernetes-operator] Aitozi opened a new pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController add…

Aitozi opened a new pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112


   … reconciler
   
   This PR is the initial implementation of `FlinkSessionJobController` and `FlinkSessionJobReconciler`. It handles the create and delete events only. Other spec change will introduce in the follow up ticket.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835746415



##########
File path: helm/flink-operator/templates/flink-operator.yaml
##########
@@ -123,6 +127,12 @@ spec:
                 path: flink-conf.yaml
               - key: log4j-console.properties
                 path: log4j-console.properties
+        {{- if .Values.hostPath.create }}
+        - name: {{ .Values.hostPath.name }}
+          hostPath:
+            path: {{ .Values.hostPath.hostPath }}
+            type: DirectoryOrCreate
+        {{- end }}

Review comment:
       What about provide two option here: hostPath and persistentvolumeclaim. Users can choose to mount the hostPath or persistentvolumeclaim or none ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835740194



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -45,7 +45,7 @@
 import java.util.Optional;
 
 /** The base observer. */
-public abstract class BaseObserver implements Observer {
+public abstract class BaseObserver implements Observer<FlinkDeployment> {

Review comment:
       Two more question here:
   
   - should we also move the `JobManagerDeploymentStatus` under the crd, since it is part of the crd.
   - should we also rename the old `JobObserver` and `JobReconciler` to `ApplicationObserver` and `ApplicationReconciler` I prefer the latter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#issuecomment-1079889065


   I am trying to get to this PR and will share the comments today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841078949



##########
File path: docs/content/docs/development/guide.md
##########
@@ -108,6 +108,14 @@ So if the CRD is changed, you have to delete the CRD resource manually, and re-i
 kubectl delete crd flinkdeployments.flink.apache.org
 ```
 
+### Mounts
+
+The operator supports to specify the volume mounts. The default mounts to hostPath can be activated by the following command. You can change the default mounts in the `helm/flink-operator/values.yaml`
+
+```bash
+helm install flink-operator helm/flink-operator --set operatorVolumeMounts.create=true --set operatorVolumes.create=true 
+```
+

Review comment:
       Maybe later we should move this to the Helm page instead, but this is good for for now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841166659



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultSessionJobValidator.java
##########
@@ -17,12 +17,18 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 
 import java.util.Optional;
 
 /** Default validator implementation for {@link FlinkSessionJob}. */
-public class DefaultSessionJobValidator implements FlinkResourceValidator<FlinkSessionJob> {
+public class DefaultSessionJobValidator implements FlinkResourceValidator {

Review comment:
       Now that we have both methods in the interface can we drop the `DefaultSessionJobValidator` and just move the logic to the `DefaultDeploymentValidator`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r840157549



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            deployFlinkJob(flinkSessionJob, defaultConfig);
+            return;
+        }
+
+        boolean specChanged = !flinkSessionJob.getSpec().equals(lastReconciledSpec);
+
+        if (specChanged) {
+            // TODO reconcile other spec change.
+            LOG.info("Other spec change have not supported");
+        }
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Configuration defaultConfig) {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+
+        if (flinkDepOptional.isPresent()) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    flinkService.cancelSessionJob(JobID.fromHexString(jobID), effectiveConfig);
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster deployment not available");
+        }
+        return DeleteControl.defaultDelete();
+    }
+
+    private void deployFlinkJob(FlinkSessionJob sessionJob, Configuration defaultConfig)
+            throws Exception {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+        if (flinkDepOptional.isPresent()
+                && (flinkDepOptional.get().getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.READY)) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            flinkService.submitJobToSessionCluster(sessionJob, effectiveConfig);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+        } else {
+            LOG.info("Session cluster deployment is not in READY state");
+        }
+    }
+
+    private Optional<FlinkDeployment> getFlinkDeployment(FlinkSessionJob sessionJob) {

Review comment:
       I rethink about this, I think register a new event source for the flinkdep seems better, by this, we can accelerate the reaction time when a  session job cluster ready for submit, I will submit a commit for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r840578430



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -43,8 +43,8 @@
 import java.util.Optional;
 import java.util.Set;
 
-/** Default validator implementation. */
-public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
+/** Default validator implementation for {@link FlinkDeployment}. */
+public class DefaultDeploymentValidator implements FlinkResourceValidator<FlinkDeployment> {

Review comment:
       Yes, I will do it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835761998



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        final String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (final RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            final JarRunHeaders headers = JarRunHeaders.getInstance();
+            final JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            final JobSpec job = sessionJob.getSpec().getJob();
+            final JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, final Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();

Review comment:
       nit: we can add some `final` modifiers to make codes nicer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835766787



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        final String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (final RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            final JarRunHeaders headers = JarRunHeaders.getInstance();
+            final JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            final JobSpec job = sessionJob.getSpec().getJob();
+            final JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, final Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();

Review comment:
       I see you have this change in your last commit, I think you probably should just revert/drop it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835771568



##########
File path: helm/flink-operator/templates/flink-operator.yaml
##########
@@ -191,3 +183,20 @@ data:
   {{- index (.Values.flinkDefaultConfiguration) "log4j-console.properties" | nindent 4 -}}
 {{- end }}
 {{- end }}
+---
+{{- if .Values.pvc.create }}
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: {{ .Values.pvc.name }}
+  namespace: {{ .Release.Namespace }}
+  labels:
+    {{- include "flink-operator.labels" . | nindent 4 }}
+spec:
+  accessModes:
+    - ReadWriteOnce
+  volumeMode: Filesystem
+  resources:
+    requests:
+      storage: {{ .Values.pvc.size }}
+{{- end }}

Review comment:
       This is meant to create the `flink-userlib` pvc. If user specify the pvc mounts, then the flink-operator should be installed by `--set pvc.create=true`.  There seems no good way to align the pvc definition and the mounts. 
   Since the mounts is a complex structure and may not be convenient to specify in the command line(From my use of helm, may be not right :), If user have to change the volume mount name , the pvc name should be changed correspondingly.
   
   

##########
File path: helm/flink-operator/templates/flink-operator.yaml
##########
@@ -191,3 +183,20 @@ data:
   {{- index (.Values.flinkDefaultConfiguration) "log4j-console.properties" | nindent 4 -}}
 {{- end }}
 {{- end }}
+---
+{{- if .Values.pvc.create }}
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: {{ .Values.pvc.name }}
+  namespace: {{ .Release.Namespace }}
+  labels:
+    {{- include "flink-operator.labels" . | nindent 4 }}
+spec:
+  accessModes:
+    - ReadWriteOnce
+  volumeMode: Filesystem
+  resources:
+    requests:
+      storage: {{ .Values.pvc.size }}
+{{- end }}

Review comment:
       Do you mean we do not have to in charge of the PVC creating when install operator? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r836406799



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -189,6 +286,12 @@ public boolean isJobManagerPortReady(Configuration config) {
         return savepointOpt;
     }
 
+    public void cancelSessionJob(JobID jobID, Configuration conf) throws Exception {
+        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+            clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835914825



##########
File path: examples/basic-session-job.yaml
##########
@@ -24,6 +24,6 @@ metadata:
 spec:
   clusterId: basic-session-example
   job:
-    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    jarURI: local:///opt/flink/userlib/TopSpeedWindowing.jar

Review comment:
       Get it, will use `/opt/flink/artifacts`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841216499



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
##########
@@ -31,4 +32,13 @@
      * @return Optional error string, should be present iff validation resulted in an error
      */
     Optional<String> validate(FlinkDeployment deployment);
+
+    /**
+     * Validate and return optional error.
+     *
+     * @param sessionJob the session job to be validated.
+     * @param session the target session cluster of the session job to be validated.
+     * @return Optional error string, should be present iff validation resulted in an error
+     */
+    Optional<String> validate(FlinkSessionJob sessionJob, Optional<FlinkDeployment> session);

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835778413



##########
File path: helm/flink-operator/templates/flink-operator.yaml
##########
@@ -191,3 +183,20 @@ data:
   {{- index (.Values.flinkDefaultConfiguration) "log4j-console.properties" | nindent 4 -}}
 {{- end }}
 {{- end }}
+---
+{{- if .Values.pvc.create }}
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: {{ .Values.pvc.name }}
+  namespace: {{ .Release.Namespace }}
+  labels:
+    {{- include "flink-operator.labels" . | nindent 4 }}
+spec:
+  accessModes:
+    - ReadWriteOnce
+  volumeMode: Filesystem
+  resources:
+    requests:
+      storage: {{ .Values.pvc.size }}
+{{- end }}

Review comment:
       I add an option to the `operatorVolumes` and `operatorVolumeMounts`.  I think the default mounts to hostPath should disable by default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835916342



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       OK, I will create a ticket to track this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835766359



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        final String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (final RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            final JarRunHeaders headers = JarRunHeaders.getInstance();
+            final JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            final JobSpec job = sessionJob.getSpec().getJob();
+            final JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, final Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();

Review comment:
       I know it's a small detail but it's not a good practice to add a bunch of `final` modifiers inside methods @bgeng777 .
   
   You can almost add these everywhere if you really wanted to but they are basically boilerplate that do not affect anything performance or otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841198754



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
##########
@@ -270,4 +271,37 @@
 
         return Optional.empty();
     }
+
+    // validate session job
+
+    @Override
+    public Optional<String> validate(
+            FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+
+        return firstPresent(
+                validateNotApplicationCluster(session),
+                validateSessionClusterId(sessionJob, session));
+    }
+
+    private Optional<String> validateSessionClusterId(
+            FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+        return session.map(
+                s -> {
+                    if (!s.getMetadata().getName().equals(sessionJob.getSpec().getClusterId())) {
+                        return "The session job's cluster id is not match with the session cluster";
+                    }
+                    return null;
+                });
+    }
+
+    private Optional<String> validateNotApplicationCluster(Optional<FlinkDeployment> session) {
+        return session.map(
+                s -> {
+                    if (s.getSpec().getJob() != null) {
+                        return "Can not submit to application cluster";
+                    } else {
+                        return null;

Review comment:
       Wouldn't this throw a nullpointer exception in the Optional logic?
   I think this should be a `flatMap` and return Optional.of(error) or empty

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
##########
@@ -31,4 +32,13 @@
      * @return Optional error string, should be present iff validation resulted in an error
      */
     Optional<String> validate(FlinkDeployment deployment);

Review comment:
       I think this method should be called `validateDeployment`

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
##########
@@ -31,4 +32,13 @@
      * @return Optional error string, should be present iff validation resulted in an error
      */
     Optional<String> validate(FlinkDeployment deployment);
+
+    /**
+     * Validate and return optional error.
+     *
+     * @param sessionJob the session job to be validated.
+     * @param session the target session cluster of the session job to be validated.
+     * @return Optional error string, should be present iff validation resulted in an error
+     */
+    Optional<String> validate(FlinkSessionJob sessionJob, Optional<FlinkDeployment> session);

Review comment:
       I think this method should be called validateSessionJob




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora merged pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841182486



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultSessionJobValidator.java
##########
@@ -17,12 +17,18 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 
 import java.util.Optional;
 
 /** Default validator implementation for {@link FlinkSessionJob}. */
-public class DefaultSessionJobValidator implements FlinkResourceValidator<FlinkSessionJob> {
+public class DefaultSessionJobValidator implements FlinkResourceValidator {

Review comment:
       I dropped the `DefaultSessionJobValidator`. Please see my last commit. The webhook have not changed accordingly. I want to create another PR for it. 
   Not I use `validate(FlinkSessionJob sessionJob, Optional<FlinkDeployment> session)` to validate the session job. It will check the relations of both. But for webhook, it may not have access to the secondary resource, Maybe we could only verify the target session job in the webhook and do the full validation in the operator. what do you think of this ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835749261



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       Okay but let’s do this in a follow up ticket , I would like to minimize impact on the existing logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835766658



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       I think this is a good idea and will make the code cleaner. I wonder if it makes sense to restrict the context to contain 1 specific resource, what if we need 2? We can iterate more on this design after the release.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#issuecomment-1081998286


   I have addressed your comments, partly left to follow up tickets, Please take a look again @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841229469



##########
File path: flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java
##########
@@ -62,7 +62,7 @@
 public class AdmissionHandler extends SimpleChannelInboundHandler<HttpRequest> {
     private static final Logger LOG = LoggerFactory.getLogger(AdmissionHandler.class);
     private static final ObjectMapper objectMapper = new ObjectMapper();
-    protected static final String VALIDATE_REQUEST_PATH = "/validate";
+    protected static final String VALIDATE_REQUEST_PATH = "/validateDeployment";

Review comment:
       It's affected by the IDE refactor, reverted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#issuecomment-1085828791


   I think we should try to merge this PR early next week, @Aitozi please coordinate with @bgeng777 and @SteNicholas regarding the other outstanding PRs (https://github.com/apache/flink-kubernetes-operator/pull/141 & https://github.com/apache/flink-kubernetes-operator/pull/131) that will likely confict with your change a little


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841079581



##########
File path: docs/content/docs/development/guide.md
##########
@@ -108,6 +108,14 @@ So if the CRD is changed, you have to delete the CRD resource manually, and re-i
 kubectl delete crd flinkdeployments.flink.apache.org
 ```
 
+### Mounts
+
+The operator supports to specify the volume mounts. The default mounts to hostPath can be activated by the following command. You can change the default mounts in the `helm/flink-operator/values.yaml`
+
+```bash
+helm install flink-operator helm/flink-operator --set operatorVolumeMounts.create=true --set operatorVolumes.create=true 
+```
+

Review comment:
       Agree




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r836400773



##########
File path: examples/basic-session-job.yaml
##########
@@ -24,6 +24,6 @@ metadata:
 spec:
   clusterId: basic-session-example
   job:
-    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    jarURI: local:///opt/flink/userlib/TopSpeedWindowing.jar

Review comment:
       Fixed 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r836418397



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/JarResolver.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/** Resolve the jar uri. */
+public class JarResolver {
+
+    public Path resolve(String jarURI) throws Exception {

Review comment:
       I also prefer `file://` to `local://` here. I have apply the change, But here we also need a follow-up ticket to support other filesystem at least the `http`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841080501



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever;
+import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob>,
+                EventSourceInitializer<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private static final String CLUSTER_ID_INDEX = "clusterId_index";
+    private static final String ALL_NAMESPACE = "allNamespace";
+
+    private final KubernetesClient kubernetesClient;
+
+    private final FlinkResourceValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private Map<String, SharedIndexInformer<FlinkSessionJob>> informers;
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            FlinkResourceValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    public void init(FlinkControllerConfig<FlinkSessionJob> config) {
+        this.controllerConfig = config;
+        this.informers = createInformers();
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }

Review comment:
       Thanks for remind, I will fix this in this pr




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#issuecomment-1085875740


   OK, I will push a next commit to fix the @wangyang0918's recent comments about the event source and will solve the conficts at the same time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835738854



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -45,7 +45,7 @@
 import java.util.Optional;
 
 /** The base observer. */
-public abstract class BaseObserver implements Observer {
+public abstract class BaseObserver implements Observer<FlinkDeployment> {

Review comment:
       Agree, I will do it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835752894



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#issuecomment-1079674894


   Hi @gyfora I have addressed all of your comments, PTAL again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835762111



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       I also proposal a change to the `Reconciler` and `Observer` interface method as below: 
   
   ```
   public interface Reconciler<CR, CTX extends ReconcilerContext<CR>> {
   
       UpdateControl<CR> reconcile(CR cr, CTX context) throws Exception;
   
       DeleteControl cleanup(CR cr, CTX ctx);
   }
   ```
   and 
   
   ```
       public ReconcilerContext(CR customResource, Context context, Configuration effectiveConfig) {
           this.customResource = customResource;
           this.context = context;
           this.effectiveConfig = effectiveConfig;
       }
   ```
   
   By this we can extend the `ReconcilerContext` to ship with other resource objects. for example , In session job: we need to fetch the `FlinkDeployment` to get the jobmanager status and the effective config. The `FlinkDeployment` object will be used in the observer and reconciler component.  After this change, we can extend the reconciler easily. 
   
   What do you think about this? @gyfora 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835768453



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        final String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (final RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            final JarRunHeaders headers = JarRunHeaders.getInstance();
+            final JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            final JobSpec job = sessionJob.getSpec().getJob();
+            final JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, final Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();

Review comment:
       Thanks for the clarification. Will consider it more carefully.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835768073



##########
File path: helm/flink-operator/templates/flink-operator.yaml
##########
@@ -191,3 +183,20 @@ data:
   {{- index (.Values.flinkDefaultConfiguration) "log4j-console.properties" | nindent 4 -}}
 {{- end }}
 {{- end }}
+---
+{{- if .Values.pvc.create }}
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: {{ .Values.pvc.name }}
+  namespace: {{ .Release.Namespace }}
+  labels:
+    {{- include "flink-operator.labels" . | nindent 4 }}
+spec:
+  accessModes:
+    - ReadWriteOnce
+  volumeMode: Filesystem
+  resources:
+    requests:
+      storage: {{ .Values.pvc.size }}
+{{- end }}

Review comment:
       what does this do? how does this relate to the volume mount  (`flink-userlib`) that you have specified in the values yaml?

##########
File path: helm/flink-operator/values.yaml
##########
@@ -41,11 +41,23 @@ jobServiceAccount:
     "helm.sh/resource-policy": keep
   name: "flink"
 
-hostPath:
+volumeMounts: |-
+  - name: flink-userlib
+    mountPath: /opt/flink/userlib
+
+volumes: |-
+  - name: flink-userlib
+    hostPath:
+      path: /tmp/flink/userlib
+      type: DirectoryOrCreate

Review comment:
       we could call these `operatorVolumes` and `operatorVolumeMounts` to be a bit more specific. Later we need a more generic way to customize the operator and we can get rid of this but it's out of scope now.
   
   Also we should not use strings here and I think the use should be able to specify them in proper yaml format:
   ```
   operatorVolumeMounts:
     - name: flink-userlib
       mountPath: /opt/flink/userlib
   operatorVolumes:
     - name: flink-userlib
       hostPath:
         path: /tmp/flink/userlib
         type: DirectoryOrCreate
   ```
   
   You can then in the template use `toYaml` in the template. There should be some other examples already
         




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r836406584



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            JarRunHeaders headers = JarRunHeaders.getInstance();
+            JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            JobSpec job = sessionJob.getSpec().getJob();
+            JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();
+        int port = conf.getInteger(RestOptions.PORT);
+        String host =
+                ObjectUtils.firstNonNull(
+                        operatorConfiguration.getFlinkServiceHostOverride(),
+                        ExternalServiceDecorator.getNamespacedExternalServiceName(
+                                clusterId, namespace));
+
+        try (RestClient restClient = new RestClient(conf, Executors.newSingleThreadExecutor())) {

Review comment:
       I make it a field of the FlinkService now, I think this method should be improve later. I directly use the `RestClient` here because there is no suitable method to upload user jar here. Maybe we could include this upstream




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi removed a comment on pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi removed a comment on pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#issuecomment-1079674894


   Hi @gyfora I have addressed all of your comments, PTAL again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835767089



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       The `1 specific resource` is the target CR event. And the related CR can be added as new field in the subclass.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835916583



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            deployFlinkJob(flinkSessionJob, defaultConfig);
+            return;
+        }
+
+        boolean specChanged = !flinkSessionJob.getSpec().equals(lastReconciledSpec);
+
+        if (specChanged) {
+            // TODO reconcile other spec change.
+            LOG.info("Other spec change have not supported");
+        }
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Configuration defaultConfig) {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+
+        if (flinkDepOptional.isPresent()) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    flinkService.cancelSessionJob(JobID.fromHexString(jobID), effectiveConfig);
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster deployment not available");
+        }
+        return DeleteControl.defaultDelete();
+    }
+
+    private void deployFlinkJob(FlinkSessionJob sessionJob, Configuration defaultConfig)
+            throws Exception {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+        if (flinkDepOptional.isPresent()
+                && (flinkDepOptional.get().getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.READY)) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            flinkService.submitJobToSessionCluster(sessionJob, effectiveConfig);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+        } else {
+            LOG.info("Session cluster deployment is not in READY state");
+        }
+    }
+
+    private Optional<FlinkDeployment> getFlinkDeployment(FlinkSessionJob sessionJob) {

Review comment:
       I have tried this way a bit, but not quite work out, I will look into this more deeply




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835768098



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        final String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (final RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            final JarRunHeaders headers = JarRunHeaders.getInstance();
+            final JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            final JobSpec job = sessionJob.getSpec().getJob();
+            final JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, final Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();

Review comment:
       I'm also torn with it, with some other method have already use this pattern, so I also add here. But I also think it's fussy to add `final` to each local variable. 
   IMO, the local final variable should introduce in some performance critical path, so I prefer to drop them in these method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r839428005



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            deployFlinkJob(flinkSessionJob, defaultConfig);
+            return;
+        }
+
+        boolean specChanged = !flinkSessionJob.getSpec().equals(lastReconciledSpec);
+
+        if (specChanged) {
+            // TODO reconcile other spec change.
+            LOG.info("Other spec change have not supported");
+        }
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Configuration defaultConfig) {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+
+        if (flinkDepOptional.isPresent()) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    flinkService.cancelSessionJob(JobID.fromHexString(jobID), effectiveConfig);
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster deployment not available");
+        }
+        return DeleteControl.defaultDelete();
+    }
+
+    private void deployFlinkJob(FlinkSessionJob sessionJob, Configuration defaultConfig)
+            throws Exception {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+        if (flinkDepOptional.isPresent()
+                && (flinkDepOptional.get().getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.READY)) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            flinkService.submitJobToSessionCluster(sessionJob, effectiveConfig);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+        } else {
+            LOG.info("Session cluster deployment is not in READY state");
+        }
+    }
+
+    private Optional<FlinkDeployment> getFlinkDeployment(FlinkSessionJob sessionJob) {

Review comment:
       Register an InformerEventSource firstly need to create an informer and then tell the SDK how to mapping the secondaryResource to the custom resource. By this, when a secondaryResource changed, it can trigger an event for the custom resource (session job)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841081668



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
##########
@@ -69,4 +69,10 @@
                     .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "The timeout for the observer to wait the flink rest client to return.");
+
+    public static final ConfigOption<Duration> OPERATOR_CANCEL_JOB_TIMEOUT =

Review comment:
       Before there is an hard code cancel job timeout is `1 min` for application cluster [here](https://github.com/apache/flink-kubernetes-operator/blob/ffe0d55613764a9dd768cc20f5d96da8154c0865/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java#L187) and the default client timeout is `10 s`.  I'm not sure can we use the same config to achieve the common purpose. But I can use client timeout now, leave it  to be improved later if necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841080302



##########
File path: helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties
##########
@@ -24,3 +24,8 @@ appender.console.name = ConsoleAppender
 appender.console.type = CONSOLE
 appender.console.layout.type = PatternLayout
 appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]} %msg%n%throwable}
+
+loggers = mine
+
+logger.mine.name = org.apache.flink.runtime.rest.RestClient
+logger.mine.level = DEBUG

Review comment:
       Oh, I forget it, I will remove it now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841082716



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
##########
@@ -69,4 +69,10 @@
                     .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "The timeout for the observer to wait the flink rest client to return.");
+
+    public static final ConfigOption<Duration> OPERATOR_CANCEL_JOB_TIMEOUT =

Review comment:
       and use this in both the application and sessionjob cancellation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#issuecomment-1079601777


   cc @gyfora @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841082244



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
##########
@@ -69,4 +69,10 @@
                     .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "The timeout for the observer to wait the flink rest client to return.");
+
+    public static final ConfigOption<Duration> OPERATOR_CANCEL_JOB_TIMEOUT =

Review comment:
       FYI, I use the common client timeout for all the timeout placeholder. eg, the jar upload and jar run and cancel job. I think we should revisit the timeout config we need to expose to user again, what do you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841182486



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultSessionJobValidator.java
##########
@@ -17,12 +17,18 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 
 import java.util.Optional;
 
 /** Default validator implementation for {@link FlinkSessionJob}. */
-public class DefaultSessionJobValidator implements FlinkResourceValidator<FlinkSessionJob> {
+public class DefaultSessionJobValidator implements FlinkResourceValidator {

Review comment:
       I dropped the `DefaultSessionJobValidator`. Please see my last commit. The webhook have not changed accordingly. I want to create another PR for it. 
   Now I use `validate(FlinkSessionJob sessionJob, Optional<FlinkDeployment> session)` to validate the session job. It will check the relations of both. But for webhook, it may not have access to the secondary resource, Maybe we could only verify the target session job in the webhook and do the full validation in the operator. what do you think of this ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r840180407



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            deployFlinkJob(flinkSessionJob, defaultConfig);
+            return;
+        }
+
+        boolean specChanged = !flinkSessionJob.getSpec().equals(lastReconciledSpec);
+
+        if (specChanged) {
+            // TODO reconcile other spec change.
+            LOG.info("Other spec change have not supported");
+        }
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Configuration defaultConfig) {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+
+        if (flinkDepOptional.isPresent()) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    flinkService.cancelSessionJob(JobID.fromHexString(jobID), effectiveConfig);
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster deployment not available");
+        }
+        return DeleteControl.defaultDelete();
+    }
+
+    private void deployFlinkJob(FlinkSessionJob sessionJob, Configuration defaultConfig)
+            throws Exception {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+        if (flinkDepOptional.isPresent()
+                && (flinkDepOptional.get().getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.READY)) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            flinkService.submitJobToSessionCluster(sessionJob, effectiveConfig);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+        } else {
+            LOG.info("Session cluster deployment is not in READY state");
+        }
+    }
+
+    private Optional<FlinkDeployment> getFlinkDeployment(FlinkSessionJob sessionJob) {

Review comment:
       Yes. What I mean is using `InformerEventSource` will not take any additional burden to K8s APIServer, compared with naked informer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835773306



##########
File path: helm/flink-operator/templates/flink-operator.yaml
##########
@@ -191,3 +183,20 @@ data:
   {{- index (.Values.flinkDefaultConfiguration) "log4j-console.properties" | nindent 4 -}}
 {{- end }}
 {{- end }}
+---
+{{- if .Values.pvc.create }}
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: {{ .Values.pvc.name }}
+  namespace: {{ .Release.Namespace }}
+  labels:
+    {{- include "flink-operator.labels" . | nindent 4 }}
+spec:
+  accessModes:
+    - ReadWriteOnce
+  volumeMode: Filesystem
+  resources:
+    requests:
+      storage: {{ .Values.pvc.size }}
+{{- end }}

Review comment:
       I think we could provide an example in the docs but maybe it would be easier to not bother with PV/PVC stuff, super complex and lot of options. Now with the mount configs users can mount any pvc easily.
   
   Let's provide a simple example in the docs that works on minikube instead :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841079315



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
##########
@@ -69,4 +69,10 @@
                     .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "The timeout for the observer to wait the flink rest client to return.");
+
+    public static final ConfigOption<Duration> OPERATOR_CANCEL_JOB_TIMEOUT =

Review comment:
       I think the client timeout can serve the same purpose. So we can probably remove this config

##########
File path: helm/flink-kubernetes-operator/templates/flink-operator.yaml
##########
@@ -180,4 +186,4 @@ data:
 {{- if index (.Values.flinkDefaultConfiguration) "log4j-console.properties" }}
   {{- index (.Values.flinkDefaultConfiguration) "log4j-console.properties" | nindent 4 -}}
 {{- end }}
-{{- end }}
+{{- end }}

Review comment:
       missing newline at the end

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever;
+import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob>,
+                EventSourceInitializer<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private static final String CLUSTER_ID_INDEX = "clusterId_index";
+    private static final String ALL_NAMESPACE = "allNamespace";
+
+    private final KubernetesClient kubernetesClient;
+
+    private final FlinkResourceValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private Map<String, SharedIndexInformer<FlinkSessionJob>> informers;
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            FlinkResourceValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    public void init(FlinkControllerConfig<FlinkSessionJob> config) {
+        this.controllerConfig = config;
+        this.informers = createInformers();
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }

Review comment:
       For the JobController we have changed the order of validate/observe and made sure the observer uses the config based on the lastReconcield spec.
   
   Please include this change here as well or at least open ticket to track this. I think it's okay if we fix it later but we need a ticket to not forget :)

##########
File path: helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties
##########
@@ -24,3 +24,8 @@ appender.console.name = ConsoleAppender
 appender.console.type = CONSOLE
 appender.console.layout.type = PatternLayout
 appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]} %msg%n%throwable}
+
+loggers = mine
+
+logger.mine.name = org.apache.flink.runtime.rest.RestClient
+logger.mine.level = DEBUG

Review comment:
       Should we remove these before merging?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841082654



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
##########
@@ -69,4 +69,10 @@
                     .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "The timeout for the observer to wait the flink rest client to return.");
+
+    public static final ConfigOption<Duration> OPERATOR_CANCEL_JOB_TIMEOUT =

Review comment:
       I see good point, lets keep the cancel timeout but improve the config name to: `operator.reconciler.flink.cancel.timeout`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841216699



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
##########
@@ -270,4 +271,37 @@
 
         return Optional.empty();
     }
+
+    // validate session job
+
+    @Override
+    public Optional<String> validate(
+            FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+
+        return firstPresent(
+                validateNotApplicationCluster(session),
+                validateSessionClusterId(sessionJob, session));
+    }
+
+    private Optional<String> validateSessionClusterId(
+            FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+        return session.map(
+                s -> {
+                    if (!s.getMetadata().getName().equals(sessionJob.getSpec().getClusterId())) {
+                        return "The session job's cluster id is not match with the session cluster";
+                    }
+                    return null;
+                });
+    }
+
+    private Optional<String> validateNotApplicationCluster(Optional<FlinkDeployment> session) {
+        return session.map(
+                s -> {
+                    if (s.getSpec().getJob() != null) {
+                        return "Can not submit to application cluster";
+                    } else {
+                        return null;

Review comment:
       The null value can be handled by `Optional.ofNullable` in `map` function, but the `flatMap` way looks more clear, Adjusted accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841219042



##########
File path: flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java
##########
@@ -62,7 +62,7 @@
 public class AdmissionHandler extends SimpleChannelInboundHandler<HttpRequest> {
     private static final Logger LOG = LoggerFactory.getLogger(AdmissionHandler.class);
     private static final ObjectMapper objectMapper = new ObjectMapper();
-    protected static final String VALIDATE_REQUEST_PATH = "/validate";
+    protected static final String VALIDATE_REQUEST_PATH = "/validateDeployment";

Review comment:
       I think you need to adjust the webhook template in the helm chart if you are changing this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r840577763



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -43,8 +43,8 @@
 import java.util.Optional;
 import java.util.Set;
 
-/** Default validator implementation. */
-public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
+/** Default validator implementation for {@link FlinkDeployment}. */
+public class DefaultDeploymentValidator implements FlinkResourceValidator<FlinkDeployment> {

Review comment:
       As we discussed in the other PR could we change this to explicitly have 2 validation methods, one for SessionJob and one for Deployment ? And remove the generic logic 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841082863



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
##########
@@ -69,4 +69,10 @@
                     .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "The timeout for the observer to wait the flink rest client to return.");
+
+    public static final ConfigOption<Duration> OPERATOR_CANCEL_JOB_TIMEOUT =

Review comment:
       OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#issuecomment-1086656035


   From my perspective, I think it's ready for review again. Please take look cc @gyfora @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835762928



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        final String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (final RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            final JarRunHeaders headers = JarRunHeaders.getInstance();
+            final JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            final JobSpec job = sessionJob.getSpec().getJob();
+            final JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, final Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835773337



##########
File path: helm/flink-operator/templates/flink-operator.yaml
##########
@@ -191,3 +183,20 @@ data:
   {{- index (.Values.flinkDefaultConfiguration) "log4j-console.properties" | nindent 4 -}}
 {{- end }}
 {{- end }}
+---
+{{- if .Values.pvc.create }}
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: {{ .Values.pvc.name }}
+  namespace: {{ .Release.Namespace }}
+  labels:
+    {{- include "flink-operator.labels" . | nindent 4 }}
+spec:
+  accessModes:
+    - ReadWriteOnce
+  volumeMode: Filesystem
+  resources:
+    requests:
+      storage: {{ .Values.pvc.size }}
+{{- end }}

Review comment:
       and we can remove the PVC section here completely




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835902072



##########
File path: helm/flink-operator/values.yaml
##########
@@ -41,6 +41,28 @@ jobServiceAccount:
     "helm.sh/resource-policy": keep
   name: "flink"
 
+operatorVolumeMounts:
+  create: false
+  data:
+    - name: flink-userlib

Review comment:
       I would also suggest the `flink-artifacts`.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/JarResolver.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/** Resolve the jar uri. */
+public class JarResolver {
+
+    public Path resolve(String jarURI) throws Exception {

Review comment:
       I am thinking whether we should use `local://` here. Because `local://` in the FlinkSessionJob has different meaning with FlinkDeployment CR. Maybe the `file://` scheme is more appropriate. Or add least we need to document this clearly.
   
   This could be done as a follow-up ticket.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        final String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (final RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            final JarRunHeaders headers = JarRunHeaders.getInstance();
+            final JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            final JobSpec job = sessionJob.getSpec().getJob();
+            final JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, final Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();

Review comment:
       Always using a `final` for local variable is not mandatory. But I think the `final` is not completely useless. It will help a lot if you do not expect the variable is overridden accidentally, especially in a long long method.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       Let's have more discussion in a dedicated ticket about changing the `Reconciler` and `Observer` interfaces.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            JarRunHeaders headers = JarRunHeaders.getInstance();
+            JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            JobSpec job = sessionJob.getSpec().getJob();
+            JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();
+        int port = conf.getInteger(RestOptions.PORT);
+        String host =
+                ObjectUtils.firstNonNull(
+                        operatorConfiguration.getFlinkServiceHostOverride(),
+                        ExternalServiceDecorator.getNamespacedExternalServiceName(
+                                clusterId, namespace));
+
+        try (RestClient restClient = new RestClient(conf, Executors.newSingleThreadExecutor())) {

Review comment:
       The executor is not shut down correctly.

##########
File path: examples/basic-session-job.yaml
##########
@@ -24,6 +24,6 @@ metadata:
 spec:
   clusterId: basic-session-example
   job:
-    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    jarURI: local:///opt/flink/userlib/TopSpeedWindowing.jar

Review comment:
       nit: I would like to use `/opt/flink/artifacts` here to avoid confused with `/opt/flink/usrlib`. `/opt/flink/usrlib` is a special directory which all the jars will be loaded by user classloader automatically.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -189,6 +286,12 @@ public boolean isJobManagerPortReady(Configuration config) {
         return savepointOpt;
     }
 
+    public void cancelSessionJob(JobID jobID, Configuration conf) throws Exception {
+        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+            clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);

Review comment:
       We could use `operatorConfiguration.getFlinkClientTimeout()`.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            deployFlinkJob(flinkSessionJob, defaultConfig);
+            return;
+        }
+
+        boolean specChanged = !flinkSessionJob.getSpec().equals(lastReconciledSpec);
+
+        if (specChanged) {
+            // TODO reconcile other spec change.
+            LOG.info("Other spec change have not supported");
+        }
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Configuration defaultConfig) {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+
+        if (flinkDepOptional.isPresent()) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    flinkService.cancelSessionJob(JobID.fromHexString(jobID), effectiveConfig);
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster deployment not available");
+        }
+        return DeleteControl.defaultDelete();
+    }
+
+    private void deployFlinkJob(FlinkSessionJob sessionJob, Configuration defaultConfig)
+            throws Exception {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+        if (flinkDepOptional.isPresent()
+                && (flinkDepOptional.get().getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.READY)) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            flinkService.submitJobToSessionCluster(sessionJob, effectiveConfig);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+        } else {
+            LOG.info("Session cluster deployment is not in READY state");
+        }
+    }
+
+    private Optional<FlinkDeployment> getFlinkDeployment(FlinkSessionJob sessionJob) {

Review comment:
       Could we register a new event source in `FlinkSessionJobController` and use `context.getSecondaryResource()` to get the FlinkDeployment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841080204



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever;
+import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob>,
+                EventSourceInitializer<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private static final String CLUSTER_ID_INDEX = "clusterId_index";
+    private static final String ALL_NAMESPACE = "allNamespace";
+
+    private final KubernetesClient kubernetesClient;
+
+    private final FlinkResourceValidator validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private Map<String, SharedIndexInformer<FlinkSessionJob>> informers;
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            FlinkResourceValidator validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    public void init(FlinkControllerConfig<FlinkSessionJob> config) {
+        this.controllerConfig = config;
+        this.informers = createInformers();
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly
+            observer.observe(flinkSessionJob, context);
+            reconciler.reconcile(flinkSessionJob, context, defaultConfig.getFlinkConfig());
+        } catch (Exception e) {
+            throw new ReconciliationException(e);
+        }
+
+        return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob)
+                .rescheduleAfter(operatorConfiguration.getReconcileInterval().toMillis());
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
+        LOG.info("Deleting FlinkSessionJob");
+
+        return reconciler.cleanup(sessionJob, context, defaultConfig.getFlinkConfig());
+    }
+
+    @Override
+    public Optional<FlinkSessionJob> updateErrorStatus(
+            FlinkSessionJob flinkSessionJob, RetryInfo retryInfo, RuntimeException e) {
+        LOG.warn(
+                "Attempt count: {}, last attempt: {}",
+                retryInfo.getAttemptCount(),
+                retryInfo.isLastAttempt());
+
+        ReconciliationUtils.updateForReconciliationError(
+                flinkSessionJob,
+                (e instanceof ReconciliationException) ? e.getCause().toString() : e.toString());
+        return Optional.of(flinkSessionJob);
+    }
+
+    @Override
+    public List<EventSource> prepareEventSources(
+            EventSourceContext<FlinkSessionJob> eventSourceContext) {
+        Preconditions.checkNotNull(controllerConfig, "Controller config cannot be null");
+        Set<String> effectiveNamespaces = controllerConfig.getEffectiveNamespaces();
+        if (effectiveNamespaces.isEmpty()) {
+            return List.of(createFlinkDepInformerEventSource(ALL_NAMESPACE));
+        } else {
+            return effectiveNamespaces.stream()
+                    .map(this::createFlinkDepInformerEventSource)
+                    .collect(Collectors.toList());
+        }
+    }
+
+    private InformerEventSource<FlinkDeployment, FlinkSessionJob> createFlinkDepInformerEventSource(
+            String name) {
+        return new InformerEventSource<>(
+                kubernetesClient.resources(FlinkDeployment.class).runnableInformer(0),
+                primaryResourceRetriever(),
+                sessionJob ->
+                        new ResourceID(
+                                sessionJob.getSpec().getClusterId(),
+                                sessionJob.getMetadata().getNamespace()),
+                false) {
+            @Override
+            public String name() {
+                return name;
+            }
+        };
+    }
+
+    /**
+     * Mapping the {@link FlinkDeployment} session cluster to {@link FlinkSessionJob}. It leverages
+     * the informer indexer.
+     *
+     * @return The {@link PrimaryResourcesRetriever}.
+     */
+    private PrimaryResourcesRetriever<FlinkDeployment> primaryResourceRetriever() {
+        return flinkDeployment -> {
+            var namespace = flinkDeployment.getMetadata().getNamespace();
+            var informer =
+                    controllerConfig.getEffectiveNamespaces().isEmpty()
+                            ? informers.get(ALL_NAMESPACE)
+                            : informers.get(namespace);
+
+            var sessionJobs =
+                    informer.getIndexer()
+                            .byIndex(CLUSTER_ID_INDEX, flinkDeployment.getMetadata().getName());
+            var resourceIDs = new HashSet<ResourceID>();
+            for (FlinkSessionJob sessionJob : sessionJobs) {
+                resourceIDs.add(
+                        new ResourceID(
+                                sessionJob.getMetadata().getName(),
+                                sessionJob.getMetadata().getNamespace()));
+            }
+            LOG.debug(
+                    "Find the target resource {} for {} ",
+                    resourceIDs,
+                    flinkDeployment.getMetadata().getNamespace());
+            return resourceIDs;
+        };
+    }
+
+    /**
+     * Create informers for session job to build indexer for cluster to session job relations.
+     *
+     * @return The different namespace's index informer.
+     */
+    private Map<String, SharedIndexInformer<FlinkSessionJob>> createInformers() {

Review comment:
       To clarify here: we maintain an extra informer index outside the controller internal informer for the `FlinkSessionJob`. Because it do not support to build indexer on top of it before preparing the event sources. I have create an [issue](https://github.com/java-operator-sdk/java-operator-sdk/issues/1114) to discuss with java-operator-sdk community about, It seems can be supported upstream, After that we can save this indexer builder.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi edited a comment on pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi edited a comment on pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#issuecomment-1085875740


   OK, I will push a next commit to fix the @wangyang0918's recent comments about the event source and will solve the conflicts at the same time and will ping your guys for the final review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r837583495



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            deployFlinkJob(flinkSessionJob, defaultConfig);
+            return;
+        }
+
+        boolean specChanged = !flinkSessionJob.getSpec().equals(lastReconciledSpec);
+
+        if (specChanged) {
+            // TODO reconcile other spec change.
+            LOG.info("Other spec change have not supported");
+        }
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Configuration defaultConfig) {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+
+        if (flinkDepOptional.isPresent()) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    flinkService.cancelSessionJob(JobID.fromHexString(jobID), effectiveConfig);
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster deployment not available");
+        }
+        return DeleteControl.defaultDelete();
+    }
+
+    private void deployFlinkJob(FlinkSessionJob sessionJob, Configuration defaultConfig)
+            throws Exception {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+        if (flinkDepOptional.isPresent()
+                && (flinkDepOptional.get().getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.READY)) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            flinkService.submitJobToSessionCluster(sessionJob, effectiveConfig);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+        } else {
+            LOG.info("Session cluster deployment is not in READY state");
+        }
+    }
+
+    private Optional<FlinkDeployment> getFlinkDeployment(FlinkSessionJob sessionJob) {

Review comment:
       Technically the  event source from the `FlinkDeployment` will have to get which `FlinkSessionJob` to trigger, this will also have to fetch from the apiserver or get from the informer cache. (Because we can't track the target `FlinkSessionJob` based on the `FlinkDeployment` CR ) 
   In the case of `FlinkDeploumenController`, the change of `Deployment` can be track to the target `FlinkDeployment` by the deployment metadata.
   On the other hand, whether we need to register new source for `FlinkDeployment` should depend on whether we have to react to the change to the `FlinkDeployment`. As we need the upgrade and delete of the session only work with there is no active session job, So I think we do not have the requirements to register a event source for session `FlinkDeployment`. 
   But we truly need to get the target `FlinkDeployment` in a lightweight way, I prefer to use an informer to cache the `FlinkDeployment` for it .
   
   What do you think of this ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835738785



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       IMO, The `ReconciliationUtils` is a tool which can be invoked in the controller or reconciler. 
   I want to return the updatecontrol from the reconciler because reconciler can directly decide the updatecontrol. Currently, `FlinkDeploymentController` infer the updatecontrol from the flinkApp, If we push this into the reconciler, it will be more clear. BTW, the `io.javaoperatorsdk.operator.api.reconciler.Reconciler#reconcile` also return the updatecontrol.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835769349



##########
File path: helm/flink-operator/values.yaml
##########
@@ -41,11 +41,23 @@ jobServiceAccount:
     "helm.sh/resource-policy": keep
   name: "flink"
 
-hostPath:
+volumeMounts: |-
+  - name: flink-userlib
+    mountPath: /opt/flink/userlib
+
+volumes: |-
+  - name: flink-userlib
+    hostPath:
+      path: /tmp/flink/userlib
+      type: DirectoryOrCreate

Review comment:
       Nice suggestion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835733585



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -45,7 +45,7 @@
 import java.util.Optional;
 
 /** The base observer. */
-public abstract class BaseObserver implements Observer {
+public abstract class BaseObserver implements Observer<FlinkDeployment> {

Review comment:
       We could rename this BaseDeploymentObserver/AbstractDeploymentObserver and move deployment related stuff under a deployment package , similarly we could have a `job` package for sessionjobs

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            deployFlinkJob(flinkSessionJob, defaultConfig);
+        }
+        // TODO reconcile other spec change.
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Configuration defaultConfig) {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+
+        if (flinkDepOptional.isPresent()) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    flinkService.cancelSessionJob(JobID.fromHexString(jobID), effectiveConfig);
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster has disappeared.");
+        }
+        return DeleteControl.defaultDelete();
+    }
+
+    private void deployFlinkJob(FlinkSessionJob sessionJob, Configuration defaultConfig)
+            throws Exception {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+        if (flinkDepOptional.isPresent()
+                && (flinkDepOptional.get().getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.READY)) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            flinkService.submitJobToSessionCluster(sessionJob, effectiveConfig);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+        } else {
+            LOG.info("Session cluster has not been ready");

Review comment:
       `Session cluster deployment is not in READY state`

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       I dont think the reconciler should return the updatecontrol, at least we have changed this a few times already before to the current state where we keep the logic in the ReconciliationUtils.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -28,7 +28,7 @@
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
 
 /** BaseReconciler with functionality that is common to job and session modes. */
-public abstract class BaseReconciler implements Reconciler {
+public abstract class BaseReconciler implements Reconciler<FlinkDeployment> {

Review comment:
       As with the BaseObserver this could be called BaseDeploymentReconcer / AbstractDeploymentReconciler and moved to a separate package

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            deployFlinkJob(flinkSessionJob, defaultConfig);
+        }
+        // TODO reconcile other spec change.
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Configuration defaultConfig) {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+
+        if (flinkDepOptional.isPresent()) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    flinkService.cancelSessionJob(JobID.fromHexString(jobID), effectiveConfig);
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster has disappeared.");

Review comment:
       `Session cluster deployment not available`

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;

Review comment:
       this could go into a `job`/`sessionjob` package

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/InternalValidator.java
##########
@@ -17,18 +17,20 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-
 import java.util.Optional;
 
-/** Validator for {@link FlinkDeployment} resources. */
-public interface FlinkDeploymentValidator {
+/**
+ * Validator for custom resources.
+ *
+ * @param <CR> The custom resource to be validated.
+ */
+public interface InternalValidator<CR> {

Review comment:
       Maybe `FlinkResourceValidator` would be a vbetter name 

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            deployFlinkJob(flinkSessionJob, defaultConfig);
+        }
+        // TODO reconcile other spec change.

Review comment:
       We could at least log something here for the users for the time being

##########
File path: helm/flink-operator/templates/flink-operator.yaml
##########
@@ -123,6 +127,12 @@ spec:
                 path: flink-conf.yaml
               - key: log4j-console.properties
                 path: log4j-console.properties
+        {{- if .Values.hostPath.create }}
+        - name: {{ .Values.hostPath.name }}
+          hostPath:
+            path: {{ .Values.hostPath.hostPath }}
+            type: DirectoryOrCreate
+        {{- end }}

Review comment:
       This logic is minikube specific right? Maybe it would be better to allow the user to define a volume-mount directly. That way they can use hostPath / persistentvolumeclaim etc.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,82 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        final String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try {
+            final RestClusterClient<String> clusterClient =

Review comment:
       should use `try (RestClusterClient<String> clusterClient = ...) { } ..` pattern




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835740194



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -45,7 +45,7 @@
 import java.util.Optional;
 
 /** The base observer. */
-public abstract class BaseObserver implements Observer {
+public abstract class BaseObserver implements Observer<FlinkDeployment> {

Review comment:
       Two more question here:
   
   - should we also move the `JobManagerDeploymentStatus` under the crd, since it is part of the crd.
   - should we also rename the old `JobObserver` and `JobReconciler` to `ApplicationObserver` or `ApplicationJobObserver` and so on I prefer the latter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835750805



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
##########
@@ -45,7 +45,7 @@
 import java.util.Optional;
 
 /** The base observer. */
-public abstract class BaseObserver implements Observer {
+public abstract class BaseObserver implements Observer<FlinkDeployment> {

Review comment:
       1. Yes, lets move it to the `status` package
   2. I think it should be called `ApplicationReconciler`/`ApplicationObserver` as it observes/reconciles both job and deployment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835767089



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       The `1 specific resource` is the target CR event. And the other resources can be added as new field in the subclass.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835746415



##########
File path: helm/flink-operator/templates/flink-operator.yaml
##########
@@ -123,6 +127,12 @@ spec:
                 path: flink-conf.yaml
               - key: log4j-console.properties
                 path: log4j-console.properties
+        {{- if .Values.hostPath.create }}
+        - name: {{ .Values.hostPath.name }}
+          hostPath:
+            path: {{ .Values.hostPath.hostPath }}
+            type: DirectoryOrCreate
+        {{- end }}

Review comment:
       What about provide two option here: hostPath and persistentvolumeclaim. Users can choose to mount the hostPath or persistentvolumeclaim or none ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835749297



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.validation.InternalValidator;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private final KubernetesClient kubernetesClient;
+
+    private final InternalValidator<FlinkSessionJob> validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            InternalValidator<FlinkSessionJob> validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        FlinkSessionJob originalCopy = ReconciliationUtils.clone(flinkSessionJob);
+        LOG.info("Starting reconciliation");
+        Optional<String> validationError = validator.validate(flinkSessionJob);
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl directly

Review comment:
       This can wait until after the release I think :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835766490



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        final String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (final RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            final JarRunHeaders headers = JarRunHeaders.getInstance();
+            final JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            final JobSpec job = sessionJob.getSpec().getJob();
+            final JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, final Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();

Review comment:
       We can leave this now if you want, but let's not use `final` modifiers inside methods unless there is a very good reason for it. Class fields of course are very different and those should be final when possible




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r836406895



##########
File path: helm/flink-operator/values.yaml
##########
@@ -41,6 +41,28 @@ jobServiceAccount:
     "helm.sh/resource-policy": keep
   name: "flink"
 
+operatorVolumeMounts:
+  create: false
+  data:
+    - name: flink-userlib

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r839408971



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            deployFlinkJob(flinkSessionJob, defaultConfig);
+            return;
+        }
+
+        boolean specChanged = !flinkSessionJob.getSpec().equals(lastReconciledSpec);
+
+        if (specChanged) {
+            // TODO reconcile other spec change.
+            LOG.info("Other spec change have not supported");
+        }
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Configuration defaultConfig) {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+
+        if (flinkDepOptional.isPresent()) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    flinkService.cancelSessionJob(JobID.fromHexString(jobID), effectiveConfig);
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster deployment not available");
+        }
+        return DeleteControl.defaultDelete();
+    }
+
+    private void deployFlinkJob(FlinkSessionJob sessionJob, Configuration defaultConfig)
+            throws Exception {
+        Optional<FlinkDeployment> flinkDepOptional = getFlinkDeployment(sessionJob);
+        if (flinkDepOptional.isPresent()
+                && (flinkDepOptional.get().getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.READY)) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), defaultConfig);
+            flinkService.submitJobToSessionCluster(sessionJob, effectiveConfig);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+        } else {
+            LOG.info("Session cluster deployment is not in READY state");
+        }
+    }
+
+    private Optional<FlinkDeployment> getFlinkDeployment(FlinkSessionJob sessionJob) {

Review comment:
       What's the difference between register an `InformerEventSource` with manual created informer? I just think register a new event source and `getSecondaryResource()` is easier to understand.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841182486



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultSessionJobValidator.java
##########
@@ -17,12 +17,18 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 
 import java.util.Optional;
 
 /** Default validator implementation for {@link FlinkSessionJob}. */
-public class DefaultSessionJobValidator implements FlinkResourceValidator<FlinkSessionJob> {
+public class DefaultSessionJobValidator implements FlinkResourceValidator {

Review comment:
       I have dropped the `DefaultSessionJobValidator`. Please see my last commit. The webhook have not changed accordingly. I want to create another PR for it. 
   Not I use `validate(FlinkSessionJob sessionJob, Optional<FlinkDeployment> session)` to validate the session job. It will check the relations of both. But for webhook, it may not have access to the secondary resource, Maybe we could only verify the target session job in the webhook and do the full validation in the operator. what do you think of this ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r841216442



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
##########
@@ -31,4 +32,13 @@
      * @return Optional error string, should be present iff validation resulted in an error
      */
     Optional<String> validate(FlinkDeployment deployment);

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org