You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/23 18:26:36 UTC

[GitHub] [flink-kubernetes-operator] usamj opened a new pull request, #278: [FLINK-27443][FLINK-27444][FLINK-27445][FLINK-27446]

usamj opened a new pull request, #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278

   This is the initial MVP implementation of Standalone mode with only stateless upgrade mode currently supported.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r922968515


##########
flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.InitStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+
+/** Utility class for constructing the TaskManager Deployment when deploying in standalone mode. */
+public class StandaloneKubernetesTaskManagerFactory {
+
+    public static Deployment buildKubernetesTaskManagerDeployment(
+            FlinkPod podTemplate,
+            StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
+        FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
+
+        final KubernetesStepDecorator[] stepDecorators =
+                new KubernetesStepDecorator[] {
+                    new InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new EnvSecretsDecorator(kubernetesTaskManagerParameters),
+                    new MountSecretsDecorator(kubernetesTaskManagerParameters),
+                    new CmdStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new HadoopConfMountDecorator(kubernetesTaskManagerParameters),
+                    new KerberosMountDecorator(kubernetesTaskManagerParameters),
+                    new FlinkConfMountDecorator(kubernetesTaskManagerParameters)
+                };

Review Comment:
   That’s right, makes sense :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r922826974


##########
flink-kubernetes-mock-shaded/pom.xml:
##########
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   And it actually fixed some of my other intellij test issues



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r908191301


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java:
##########
@@ -46,6 +49,10 @@ public static Mode getMode(FlinkDeployment flinkApp) {
     }
 
     private static Mode getMode(FlinkDeploymentSpec spec) {
-        return spec.getJob() != null ? APPLICATION : SESSION;
+        KubernetesDeploymentMode deploymentMode = spec.getMode();

Review Comment:
   This could use the `getDeploymentMode` utility to avoid null checks



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java:
##########
@@ -20,11 +20,14 @@
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 
 /** The mode of {@link FlinkDeployment}. */
 public enum Mode {
-    APPLICATION,
-    SESSION;
+    NATIVE_APPLICATION,
+    NATIVE_SESSION,
+    STANDALONE_APPLICATION,
+    STANDALONE_SESSION;

Review Comment:
   Adding these new enums feels a bit forced because everwhere you use it you either check session/application or native/standalone. Maybe we can simply have 2 enums (Mode & KubernetesDeploymentMode)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] usamj commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
usamj commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r922903180


##########
flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.InitStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+
+/** Utility class for constructing the TaskManager Deployment when deploying in standalone mode. */
+public class StandaloneKubernetesTaskManagerFactory {
+
+    public static Deployment buildKubernetesTaskManagerDeployment(
+            FlinkPod podTemplate,
+            StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
+        FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
+
+        final KubernetesStepDecorator[] stepDecorators =
+                new KubernetesStepDecorator[] {
+                    new InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new EnvSecretsDecorator(kubernetesTaskManagerParameters),
+                    new MountSecretsDecorator(kubernetesTaskManagerParameters),
+                    new CmdStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new HadoopConfMountDecorator(kubernetesTaskManagerParameters),
+                    new KerberosMountDecorator(kubernetesTaskManagerParameters),
+                    new FlinkConfMountDecorator(kubernetesTaskManagerParameters)
+                };

Review Comment:
   It looks like it mounts the TM pod template into the Pod which I assume is used by the JM to create the TM pods? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] usamj commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
usamj commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r922884347


##########
flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.InitStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+
+/** Utility class for constructing the TaskManager Deployment when deploying in standalone mode. */
+public class StandaloneKubernetesTaskManagerFactory {
+
+    public static Deployment buildKubernetesTaskManagerDeployment(
+            FlinkPod podTemplate,
+            StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
+        FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
+
+        final KubernetesStepDecorator[] stepDecorators =
+                new KubernetesStepDecorator[] {
+                    new InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new EnvSecretsDecorator(kubernetesTaskManagerParameters),
+                    new MountSecretsDecorator(kubernetesTaskManagerParameters),
+                    new CmdStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new HadoopConfMountDecorator(kubernetesTaskManagerParameters),
+                    new KerberosMountDecorator(kubernetesTaskManagerParameters),
+                    new FlinkConfMountDecorator(kubernetesTaskManagerParameters)
+                };

Review Comment:
   Both are unneeded for TM, looking at it closer `PodTemplateMountDecorator` isn't needed for JM either. I will remove it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r905796187


##########
flink-kubernetes-mock-shaded/pom.xml:
##########
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-kubernetes-mock-shaded</artifactId>
+    <name>Flink Kubernetes Mock Shaded</name>
+    <dependencies>
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-server-mock</artifactId>
+            <version>5.5.0</version>
+            <exclusions>

Review Comment:
   It is a bit unusual to introduce a whole new module for mocking a test dependency. What is the purpose of this? Can we get rid of the module somehow?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r922428482


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.deployment.ClusterDeploymentException;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.standalone.KubernetesStandaloneClusterDescriptor;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+
+/**
+ * Implementation of {@link FlinkService} submitting and interacting with Standalone Kubernetes
+ * Flink clusters and jobs.
+ */
+public class StandaloneFlinkService extends AbstractFlinkService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StandaloneFlinkService.class);
+
+    public StandaloneFlinkService(
+            KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
+        super(kubernetesClient, configManager);
+    }
+
+    @Override
+    protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
+        LOG.info("Deploying application cluster");
+        submitClusterInternal(conf);
+        LOG.info("Application cluster successfully deployed");
+    }
+
+    @Override
+    public void submitSessionCluster(Configuration conf) throws Exception {
+        LOG.info("Deploying session cluster");
+        submitClusterInternal(conf);
+        LOG.info("Session cluster successfully deployed");
+    }
+
+    @Override
+    public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
+            throws Exception {
+        cancelJob(deployment, upgradeMode, conf, true);
+    }
+
+    @Override
+    public void deleteClusterDeployment(
+            ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
+        deleteClusterInternal(meta, deleteHaData);
+    }
+
+    @Override
+    protected PodList getJmPodList(String namespace, String clusterId) {
+        return kubernetesClient
+                .pods()
+                .inNamespace(namespace)
+                .withLabels(StandaloneKubernetesUtils.getJobManagerSelectors(clusterId))
+                .list();
+    }
+
+    @VisibleForTesting
+    protected FlinkStandaloneKubeClient createNamespacedKubeClient(
+            Configuration configuration, String namespace) {
+        final int poolSize =
+                configuration.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
+
+        ExecutorService executorService =
+                Executors.newFixedThreadPool(
+                        poolSize,
+                        new ExecutorThreadFactory("flink-kubeclient-io-for-standalone-service"));
+
+        return new Fabric8FlinkStandaloneKubeClient(
+                configuration,
+                Fabric8FlinkStandaloneKubeClient.createNamespacedKubeClient(namespace),
+                executorService);
+    }
+
+    private void submitClusterInternal(Configuration conf) throws ClusterDeploymentException {
+        final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+
+        FlinkStandaloneKubeClient client = createNamespacedKubeClient(conf, namespace);
+        try (final KubernetesStandaloneClusterDescriptor kubernetesClusterDescriptor =
+                new KubernetesStandaloneClusterDescriptor(conf, client)) {
+            kubernetesClusterDescriptor.deploySessionCluster(getClusterSpecification(conf));

Review Comment:
   Why do we always call `deploySessionCluster` both for session and applicationclusters?
   
   There is a deployApplicationCluster method in the clusterdescriptor also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r918804156


##########
flink-kubernetes-standalone/pom.xml:
##########
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+
+    <parent>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+
+    <artifactId>flink-kubernetes-standalone</artifactId>
+    <name>Flink Kubernetes Standalone</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- Test -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>${junit.jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>${hamcrest.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <artifactId>flink-kubernetes-mock-shaded</artifactId>
+            <groupId>org.apache.flink</groupId>
+            <version>1.1-SNAPSHOT</version>

Review Comment:
   this shuold be `${project.version}`



##########
flink-kubernetes-mock-shaded/pom.xml:
##########
@@ -0,0 +1,146 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-kubernetes-mock-shaded</artifactId>
+    <name>Flink Kubernetes Mock Shaded</name>
+    <dependencies>

Review Comment:
   I think we need to somehow exclude this from the `mvn deploy` command so that we do not release this artifact. Otherwise we have to also make sure that NOTICES/LICENSING is correct in the jar



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -186,24 +192,24 @@ private static void setRandomJobResultStorePath(Configuration effectiveConfig) {
     }
 
     @Override
-    public boolean reconcileOtherChanges(FlinkDeployment deployment, Configuration observeConfig)
-            throws Exception {
-        if (super.reconcileOtherChanges(deployment, observeConfig)) {
+    public boolean reconcileOtherChanges(
+            FlinkDeployment deployment, Context ctx, Configuration observeConfig) throws Exception {
+        if (SavepointUtils.triggerSavepointIfNeeded(flinkService, deployment, observeConfig)) {
             return true;
         }

Review Comment:
   I think we can restore the original behaviour here and call super now



##########
flink-kubernetes-standalone/pom.xml:
##########
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+
+    <parent>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>

Review Comment:
   The flink-kubernetes-standalone is currently missing the NOTICE file under src/main/resources...



##########
flink-kubernetes-shaded/pom.xml:
##########
@@ -69,6 +81,16 @@ under the License.
                                     <shadedPattern>org.apache.flink.kubernetes.shaded.io.fabric8</shadedPattern>
                                 </relocation>
                             </relocations>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/DEPENDENCIES</exclude>
+                                        <exclude>META-INF/LICENSE</exclude>
+                                        <exclude>META-INF/MANIFEST.MF</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>

Review Comment:
   This will remove the NOTICE / LINCENSE files from the shaded jar which is probably not right from a licensing perspective as we are releasing this artifact. 
   
   What do you think @wangyang0918 



##########
flink-kubernetes-standalone/pom.xml:
##########
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+
+    <parent>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+
+    <artifactId>flink-kubernetes-standalone</artifactId>
+    <name>Flink Kubernetes Standalone</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- Test -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>${junit.jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>${hamcrest.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <artifactId>flink-kubernetes-mock-shaded</artifactId>
+            <groupId>org.apache.flink</groupId>
+            <version>1.1-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>3.0.0-M4</version>
+            </plugin>

Review Comment:
   We seem to have this in basically every pom, could we simply move it to the project parent pom instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r922890202


##########
flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.InitStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+
+/** Utility class for constructing the TaskManager Deployment when deploying in standalone mode. */
+public class StandaloneKubernetesTaskManagerFactory {
+
+    public static Deployment buildKubernetesTaskManagerDeployment(
+            FlinkPod podTemplate,
+            StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
+        FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
+
+        final KubernetesStepDecorator[] stepDecorators =
+                new KubernetesStepDecorator[] {
+                    new InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new EnvSecretsDecorator(kubernetesTaskManagerParameters),
+                    new MountSecretsDecorator(kubernetesTaskManagerParameters),
+                    new CmdStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new HadoopConfMountDecorator(kubernetesTaskManagerParameters),
+                    new KerberosMountDecorator(kubernetesTaskManagerParameters),
+                    new FlinkConfMountDecorator(kubernetesTaskManagerParameters)
+                };

Review Comment:
   Hm interesting, do you know why is it used in the Native integration and not here? I don't quite understand the purpose of it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#issuecomment-1185789710

   I hit a few issues during my initial local testing:
   
   1. Had a maven test failure:
   `StandaloneKubernetesJobManagerFactoryTest.testFlinkConfConfigMap:269 expected: <1> but was: <2>`
   
   2. Noticed that after running the tests some resources were deployed to my local minikube and kind of messed up the operator state there : `flink-operator-test/test-session-cluster`
   
   3. The docker build seems to fail in a fresh minikube env:
   
   ```
   > [build 4/5] RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl !flink-kubernetes-docs -DskipTests=true:
   #14 0.960 [INFO] Scanning for projects...
   #14 76.22 [ERROR] [ERROR] Some problems were encountered while processing the POMs:
   #14 76.22 [FATAL] Non-resolvable parent POM for org.apache.flink:flink-kubernetes-operator-parent:1.1-SNAPSHOT: Could not transfer artifact org.apache:apache:pom:23 from/to central (https://repo.maven.apache.org/maven2): transfer failed for https://repo.maven.apache.org/maven2/org/apache/apache/23/apache-23.pom and 'parent.relativePath' points at wrong local POM @ line 22, column 13
   #14 76.22  @
   #14 76.22 [ERROR] The build could not read 1 project -> [Help 1]
   #14 76.22 [ERROR]
   #14 76.22 [ERROR]   The project org.apache.flink:flink-kubernetes-operator-parent:1.1-SNAPSHOT (/app/pom.xml) has 1 error
   #14 76.22 [ERROR]     Non-resolvable parent POM for org.apache.flink:flink-kubernetes-operator-parent:1.1-SNAPSHOT: Could not transfer artifact org.apache:apache:pom:23 from/to central (https://repo.maven.apache.org/maven2): transfer failed for https://repo.maven.apache.org/maven2/org/apache/apache/23/apache-23.pom and 'parent.relativePath' points at wrong local POM @ line 22, column 13: Connect to repo.maven.apache.org:443 [repo.maven.apache.org/199.232.80.215] failed: Connection refused (Connection refused) -> [Help 2]
   #14 76.22 [ERROR]
   #14 76.22 [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
   #14 76.23 [ERROR] Re-run Maven using the -X switch to enable full debug logging.
   #14 76.23 [ERROR]
   #14 76.23 [ERROR] For more information about the errors and possible solutions, please read the following articles:
   #14 76.23 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/ProjectBuildingException
   #14 76.23 [ERROR] [Help 2] http://cwiki.apache.org/confluence/display/MAVEN/UnresolvableModelException
   ------
   executor failed running [/bin/sh -c mvn -ntp clean install -pl !flink-kubernetes-docs -DskipTests=$SKIP_TESTS]: exit code: 1
   ```
   
   I think you missed copying the new module dirs in the Dockerfile for the build 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r922510295


##########
flink-kubernetes-mock-shaded/pom.xml:
##########
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   I completely removed the whole mock shaded module (https://github.com/gyfora/flink-kubernetes-operator/commit/64a1ec539f56b68bf76091aabde56fcf00791467) and everything seem to still work. All tests still pass.
   
   Do we still need this? What am I missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#issuecomment-1186097180

   The problem with the JobManagerFactory test was that it picks up configs from your local `/opt/flink/conf` directory. I actually had stuff there which caused the problem.
   
   We have to specifically overwrite the conf value to point to a missing dir in the test. Something like: https://github.com/gyfora/flink-kubernetes-operator/commit/b217e0b3e433717947d7357f16a4219bc0567524


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r922783719


##########
flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.InitStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+
+/** Utility class for constructing the TaskManager Deployment when deploying in standalone mode. */
+public class StandaloneKubernetesTaskManagerFactory {
+
+    public static Deployment buildKubernetesTaskManagerDeployment(
+            FlinkPod podTemplate,
+            StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
+        FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
+
+        final KubernetesStepDecorator[] stepDecorators =
+                new KubernetesStepDecorator[] {
+                    new InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new EnvSecretsDecorator(kubernetesTaskManagerParameters),
+                    new MountSecretsDecorator(kubernetesTaskManagerParameters),
+                    new CmdStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new HadoopConfMountDecorator(kubernetesTaskManagerParameters),
+                    new KerberosMountDecorator(kubernetesTaskManagerParameters),
+                    new FlinkConfMountDecorator(kubernetesTaskManagerParameters)
+                };

Review Comment:
   I noticed that we are missing the following two decorators here:
   
   ```
   new UserLibMountDecorator(kubernetesJobManagerParameters),
   new PodTemplateMountDecorator(kubernetesJobManagerParameters)
   ```
   
   Why don't we have these here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#issuecomment-1190248330

   closing this in favor of https://github.com/apache/flink-kubernetes-operator/pull/324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r915920165


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -494,4 +500,42 @@ public static <SPEC extends AbstractFlinkSpec> void updateStatusForAlreadyUpgrad
         reconciliationStatus.setLastReconciledSpec(
                 ReconciliationUtils.writeSpecWithMeta(lastSpecWithMeta.f0, lastSpecWithMeta.f1));
     }
+
+    public static boolean shouldRollBackDeployment(
+            ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus,
+            Configuration configuration,
+            FlinkService flinkService) {
+
+        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+            return true;
+        }
+
+        if (!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
+                || reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK
+                || reconciliationStatus.isLastReconciledSpecStable()) {
+            return false;
+        }
+
+        var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
+        if (lastStableSpec != null
+                && lastStableSpec.getJob() != null
+                && lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
+            // Should not roll back to suspended state
+            return false;
+        }
+
+        Duration readinessTimeout =
+                configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
+        if (!Instant.now()

Review Comment:
   This method/logic has been moved to the `AbstractFlinkResourceReconciler`



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -216,30 +229,94 @@ protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
                     spec.getTaskManager().getPodTemplate(),
                     effectiveConfig,
                     false);
+
+            if (spec.getTaskManager().getReplicas() != null
+                    && spec.getTaskManager().getReplicas() > 0) {
+                effectiveConfig.set(
+                        StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS,
+                        spec.getTaskManager().getReplicas());
+            }
+        }
+
+        if (spec.getJob() != null
+                && KubernetesDeploymentMode.getDeploymentMode(spec)
+                        == KubernetesDeploymentMode.STANDALONE) {
+            final int tmTaskSlots = effectiveConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
+            final int jobParallelism = spec.getJob().getParallelism();
+            if (!effectiveConfig.contains(
+                            StandaloneKubernetesConfigOptionsInternal
+                                    .KUBERNETES_TASKMANAGER_REPLICAS)
+                    && tmTaskSlots > 0
+                    && jobParallelism > 0) {
+                effectiveConfig.set(
+                        StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS,
+                        (int) Math.ceil((double) jobParallelism / tmTaskSlots));
+            }

Review Comment:
   I think this logic could be simplified by a combination of the `getParallelism()` and `FlinkUtils#getNumTaskManagers() ` . At the moment it seems a little overly complex :) 
   
   



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -206,12 +204,6 @@ protected void rollback(CR resource, Context context, Configuration observeConfi
         reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
     }
 
-    @Override
-    public boolean reconcileOtherChanges(CR resource, Configuration observeConfig)
-            throws Exception {
-        return SavepointUtils.triggerSavepointIfNeeded(flinkService, resource, observeConfig);
-    }

Review Comment:
   Why did you remove it from here? It applies for both sessionjobs and applications



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/InternalKubernetesOperatorConfigOptions.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** This class holds internal configuration constants used by the flink operator. */
+public class InternalKubernetesOperatorConfigOptions {
+
+    public static final ConfigOption<KubernetesDeploymentMode> KUBERNETES_DEPLOYMENT_MODE =
+            key("kubernetes.operator.internal.deployment-mode")

Review Comment:
   can we prefix with `$internal.` please?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -288,41 +282,8 @@ private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployCon
      * @param configuration Flink cluster configuration.
      * @return True if the resource should be rolled back.
      */
-    private boolean shouldRollBack(
-            ReconciliationStatus<SPEC> reconciliationStatus, Configuration configuration) {
-
-        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
-            return true;
-        }
-
-        if (!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
-                || reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK
-                || reconciliationStatus.isLastReconciledSpecStable()) {
-            return false;
-        }

Review Comment:
   Why did you remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#issuecomment-1185818168

   Also I cannot seem to run some tests from IntelliJ:
   
   `Fabric8FlinkStandaloneKubeClientTest`
   ```
   Test ignored.
   
   Test ignored.
   
   java.lang.NoSuchMethodError: 'com.fasterxml.jackson.databind.ObjectMapper io.fabric8.kubernetes.client.utils.Serialization.jsonMapper()'
   ```
   Do you have any idea how to fix this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r922510295


##########
flink-kubernetes-mock-shaded/pom.xml:
##########
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   I completely removed the whole mock shaded module (https://github.com/gyfora/flink-kubernetes-operator/commit/db0fbcc3e97ce6c6c7eb9a4575ab1305fd26c34c) and everything seem to still work. All tests still pass.
   
   Do we still need this? What am I missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r922819597


##########
flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesJobManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesTaskManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.util.concurrent.Executors;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** @link Fabric8FlinkStandaloneKubeClient unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class Fabric8FlinkStandaloneKubeClientTest {
+    private static final String NAMESPACE = "test";
+
+    KubernetesMockServer mockServer;
+    protected NamespacedKubernetesClient kubernetesClient;
+    private FlinkStandaloneKubeClient flinkKubeClient;
+    private StandaloneKubernetesTaskManagerParameters taskManagerParameters;
+    private Deployment tmDeployment;
+    private ClusterSpecification clusterSpecification;
+    private Configuration flinkConfig = new Configuration();
+
+    @BeforeEach
+    public final void setup() {
+        flinkConfig = TestUtils.createTestFlinkConfig();
+        kubernetesClient = mockServer.createClient();
+
+        flinkKubeClient =
+                new Fabric8FlinkStandaloneKubeClient(
+                        flinkConfig, kubernetesClient, Executors.newDirectExecutorService());
+        clusterSpecification = TestUtils.createClusterSpecification();
+
+        taskManagerParameters =
+                new StandaloneKubernetesTaskManagerParameters(flinkConfig, clusterSpecification);
+
+        tmDeployment =
+                StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment(
+                        new FlinkPod.Builder().build(), taskManagerParameters);
+    }
+
+    @Test
+    public void testCreateTaskManagerDeployment() {
+        flinkKubeClient.createTaskManagerDeployment(tmDeployment);
+
+        final List<Deployment> resultedDeployments =
+                kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
+        assertEquals(1, resultedDeployments.size());
+    }
+
+    @Test
+    public void testStopAndCleanupCluster() throws Exception {
+        flinkConfig = TestUtils.createTestFlinkConfig();
+        ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification();

Review Comment:
   seems like duplicate code, as @BeforeEach already assigns the same values



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r923347188


##########
flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.standalone;
+
+import org.apache.flink.client.deployment.ClusterDeploymentException;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.Endpoint;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
+import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesJobManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesTaskManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.rpc.AddressResolution;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Standalone Kubernetes specific {@link ClusterDescriptor} implementation. */
+public class KubernetesStandaloneClusterDescriptor extends KubernetesClusterDescriptor {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(KubernetesStandaloneClusterDescriptor.class);
+
+    private static final String CLUSTER_DESCRIPTION = "Standalone Kubernetes cluster";
+
+    private final Configuration flinkConfig;
+
+    private final FlinkStandaloneKubeClient client;
+
+    private final String clusterId;
+
+    public KubernetesStandaloneClusterDescriptor(
+            Configuration flinkConfig, FlinkStandaloneKubeClient client) {
+        super(flinkConfig, client);
+        this.flinkConfig = checkNotNull(flinkConfig);
+        this.client = checkNotNull(client);
+        this.clusterId =
+                checkNotNull(
+                        flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID),
+                        "ClusterId must be specified!");
+    }
+
+    @Override
+    public String getClusterDescription() {
+        return CLUSTER_DESCRIPTION;
+    }
+
+    @Override
+    public ClusterClientProvider<String> deploySessionCluster(
+            ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
+        flinkConfig.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
+
+        final ClusterClientProvider clusterClientProvider =
+                deployClusterInternal(clusterSpecification);
+
+        try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {
+            LOG.info(
+                    "Created flink session cluster {} successfully, JobManager Web Interface: {}",
+                    clusterId,
+                    clusterClient.getWebInterfaceURL());
+        }
+        return clusterClientProvider;
+    }
+
+    @Override
+    public ClusterClientProvider<String> deployApplicationCluster(
+            ClusterSpecification clusterSpecification,
+            ApplicationConfiguration applicationConfiguration)
+            throws ClusterDeploymentException {
+        flinkConfig.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+        applicationConfiguration.applyToConfiguration(flinkConfig);
+        final ClusterClientProvider clusterClientProvider =
+                deployClusterInternal(clusterSpecification);
+
+        try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {
+            LOG.info(
+                    "Created flink session cluster {} successfully, JobManager Web Interface: {}",

Review Comment:
   The log should say `application cluster` instead of `session cluster`



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+
+/**
+ * Implementation of {@link FlinkService} submitting and interacting with Native Kubernetes Flink
+ * clusters and jobs.
+ */
+public class NativeFlinkService extends AbstractFlinkService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class);
+
+    public NativeFlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
+        super(kubernetesClient, configManager);
+    }
+
+    @Override
+    protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
+        final ClusterClientServiceLoader clusterClientServiceLoader =

Review Comment:
   We should add ` LOG.info("Deploying application cluster");` in the beginning for consistency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #278:
URL: https://github.com/apache/flink-kubernetes-operator/pull/278#discussion_r908188650


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -143,6 +150,14 @@ under the License.
             <version>${junit.jupiter.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <type>jar</type>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   In our current tests we spent considerable effort to avoid using mocks (and Mockito) according to the Flink codestyle: https://flink.apache.org/contributing/code-style-and-quality-common.html
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora closed pull request #278: [WIP] Add standalone mode support

Posted by GitBox <gi...@apache.org>.
gyfora closed pull request #278: [WIP] Add standalone mode support
URL: https://github.com/apache/flink-kubernetes-operator/pull/278


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org