You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/04 14:42:44 UTC

[GitHub] [flink-kubernetes-operator] Aitozi opened a new pull request #41: [FLINK-26377] Extract the Reconciler interface

Aitozi opened a new pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41


   This PR is meant to extract the reconciler interface and create the instance of reconciler on the fly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819822316



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {
+        return reconcilerMap.computeIfAbsent(
+                getMode(flinkApp),
+                mode -> {
+                    switch (mode) {
+                        case SESSION:
+                            return new SessionReconciler(

Review comment:
       IMO, the `FlinkReconciler` could add an interface like `getMode`, and register the mapping to the `FlinkReconcilerFactory`. Thus, the `getOrCreate` could return the reconciler by the mode.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r820114806



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {
+        return reconcilerMap.computeIfAbsent(
+                getMode(flinkApp),
+                mode -> {
+                    switch (mode) {
+                        case SESSION:
+                            return new SessionReconciler(

Review comment:
       I have considered this way, But I do not quite see the usage of the `getMode`. So I lean to keep the current way, we just have to maintain the instantiation in the `ReconcilerFactory`, and instantiate at need.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
gyfora merged pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819808655



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {

Review comment:
       Ok we can keep it like this now :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819823889



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/** The internal interface of reconciler. */
+public interface FlinkReconciler {

Review comment:
       Is it better to naming the interface to `Reconciler`? Because the `BaseReconciler` is the implementation of this interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#issuecomment-1059226676


   I introduced the enum `Mode` to distinguish the different run mode in this pr, It is for internal use now, If we have the consensus for the operator supported mode later, we can make this a field at `FlinkDeployment` then.
   
   cc @gyfora @wangyang0918 @tweise PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#issuecomment-1059785481


   > > I introduced the enum Mode to distinguish the different run mode in this pr, It is for internal use now, If we have the consensus for the operator supported mode later, we can make this a field at FlinkDeployment then.
   > 
   > In its present form I would prefer to not add it to the CRD. It is redundant, presence or absence of job spec provides the same information. I think future standalone mode is orthogonal.
   
   Agree, I have not add it to the CRD, It's created for the internal use now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819679829



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/CompositeReconciler.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The composite reconciler which will use the target reconciler based on the app mode. */
+public class CompositeReconciler implements FlinkReconciler {
+
+    private final Map<Mode, FlinkReconciler> reconcilerMap = new ConcurrentHashMap<>();
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final FlinkReconcilerFactory factory;
+
+    public CompositeReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkReconcilerFactory factory) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.factory = factory;
+    }
+
+    @Override
+    public UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
+            throws Exception {
+        Mode mode = inferMode(flinkApp);
+        return getOrCreateReconciler(mode)
+                .reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
+    }
+
+    @Override
+    public DeleteControl cleanup(
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        Mode mode = inferMode(flinkApp);
+        return getOrCreateReconciler(mode).cleanup(operatorNamespace, flinkApp, effectiveConfig);
+    }
+
+    private FlinkReconciler getOrCreateReconciler(Mode mode) {
+        return reconcilerMap.computeIfAbsent(
+                mode,
+                m -> factory.create(kubernetesClient, flinkService, operatorConfiguration, mode));
+    }
+
+    private Mode inferMode(FlinkDeployment flinkApp) {

Review comment:
       Get it, thanks, I will change the implementation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819720657



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {
+        return reconcilerMap.computeIfAbsent(
+                getMode(flinkApp),
+                mode -> {
+                    switch (mode) {
+                        case SESSION:
+                            return new SessionReconciler(
+                                    kubernetesClient, flinkService, operatorConfiguration);
+                        case JOB:
+                            return new JobReconciler(
+                                    kubernetesClient, flinkService, operatorConfiguration);
+                        default:
+                            throw new UnsupportedOperationException(
+                                    String.format("Unsupported running mode: %s", mode));
+                    }
+                });
+    }
+
+    private Mode getMode(FlinkDeployment flinkApp) {
+        return flinkApp.getSpec().getJob() != null ? Mode.JOB : Mode.SESSION;
+    }
+
+    enum Mode {
+        JOB,

Review comment:
       renamed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r820001216



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {

Review comment:
       Ditto, `Flink` in the name doesn't resonate. I would say either just `ReconcilerFactory` or `FlinkDeploymentReconcilerFactory`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819819217



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {
+        return reconcilerMap.computeIfAbsent(
+                getMode(flinkApp),
+                mode -> {
+                    switch (mode) {
+                        case SESSION:
+                            return new SessionReconciler(
+                                    kubernetesClient, flinkService, operatorConfiguration);
+                        case APPLICATION:
+                            return new JobReconciler(
+                                    kubernetesClient, flinkService, operatorConfiguration);
+                        default:

Review comment:
       The default branch could be removed because the return value of the `getMode` method only has `APPLICATION` or `SESSION` mode, no other mode here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819823889



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/** The internal interface of reconciler. */
+public interface FlinkReconciler {

Review comment:
       Is it better to naming the interface to `Reconciler`? Because the `BaseReconciler` is the implementation of this interface. The `FlinkReconciler` interface naming is not so good to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r820117650



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/** The internal interface of reconciler. */
+public interface FlinkReconciler {
+
+    UpdateControl<FlinkDeployment> reconcile(

Review comment:
       updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r820113901



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/** The internal interface of reconciler. */
+public interface FlinkReconciler {

Review comment:
       will rename it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819677696



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/CompositeReconciler.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The composite reconciler which will use the target reconciler based on the app mode. */
+public class CompositeReconciler implements FlinkReconciler {
+
+    private final Map<Mode, FlinkReconciler> reconcilerMap = new ConcurrentHashMap<>();
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final FlinkReconcilerFactory factory;
+
+    public CompositeReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkReconcilerFactory factory) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.factory = factory;
+    }
+
+    @Override
+    public UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
+            throws Exception {
+        Mode mode = inferMode(flinkApp);
+        return getOrCreateReconciler(mode)
+                .reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
+    }
+
+    @Override
+    public DeleteControl cleanup(
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        Mode mode = inferMode(flinkApp);
+        return getOrCreateReconciler(mode).cleanup(operatorNamespace, flinkApp, effectiveConfig);
+    }
+
+    private FlinkReconciler getOrCreateReconciler(Mode mode) {
+        return reconcilerMap.computeIfAbsent(
+                mode,
+                m -> factory.create(kubernetesClient, flinkService, operatorConfiguration, mode));
+    }
+
+    private Mode inferMode(FlinkDeployment flinkApp) {

Review comment:
       the factory can still distinghuish the reconcilers and cache them, but this should be an implementation detail of the factory at this point




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r820113889



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {
+        return reconcilerMap.computeIfAbsent(
+                getMode(flinkApp),
+                mode -> {
+                    switch (mode) {
+                        case SESSION:
+                            return new SessionReconciler(
+                                    kubernetesClient, flinkService, operatorConfiguration);
+                        case APPLICATION:
+                            return new JobReconciler(
+                                    kubernetesClient, flinkService, operatorConfiguration);
+                        default:

Review comment:
       Yes, But the `default` branch is forced. So I throw an exception there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
tweise commented on pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#issuecomment-1059628586


   > I introduced the enum Mode to distinguish the different run mode in this pr, It is for internal use now, If we have the consensus for the operator supported mode later, we can make this a field at FlinkDeployment then.
   
   In its present form I would prefer to not add it to the CRD. It is redundant, presence or absence of job spec provides the same information. I think future standalone mode is orthogonal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819720987



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/**
+ * The internal interface of reconciler, It aligns to the {@link
+ * io.javaoperatorsdk.operator.api.reconciler.Reconciler}.

Review comment:
       removed the extra comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819822316



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {
+        return reconcilerMap.computeIfAbsent(
+                getMode(flinkApp),
+                mode -> {
+                    switch (mode) {
+                        case SESSION:
+                            return new SessionReconciler(

Review comment:
       IMO, the `BaseReconciler` could add a abstract method like `getMode`, and register the mapping to the `FlinkReconcilerFactory`. Thus, the `getOrCreate` could return the reconciler by the mode.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#issuecomment-1059785574


   I have addressed your comments @tweise @SteNicholas , please take a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#issuecomment-1059418252


   LGTM, let's give some time for others to chime in with any comments before merging :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819666628



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/**
+ * The internal interface of reconciler, It aligns to the {@link
+ * io.javaoperatorsdk.operator.api.reconciler.Reconciler}.
+ */
+public interface FlinkReconciler {
+
+    UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
+            throws Exception;
+
+    default DeleteControl cleanup(

Review comment:
       Why does this have a default basically empty implementaiton? I think we should keep it abstract (required)

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/CompositeReconciler.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The composite reconciler which will use the target reconciler based on the app mode. */
+public class CompositeReconciler implements FlinkReconciler {
+
+    private final Map<Mode, FlinkReconciler> reconcilerMap = new ConcurrentHashMap<>();
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final FlinkReconcilerFactory factory;
+
+    public CompositeReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkReconcilerFactory factory) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.factory = factory;
+    }
+
+    @Override
+    public UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
+            throws Exception {
+        Mode mode = inferMode(flinkApp);
+        return getOrCreateReconciler(mode)
+                .reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
+    }
+
+    @Override
+    public DeleteControl cleanup(
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        Mode mode = inferMode(flinkApp);
+        return getOrCreateReconciler(mode).cleanup(operatorNamespace, flinkApp, effectiveConfig);
+    }
+
+    private FlinkReconciler getOrCreateReconciler(Mode mode) {
+        return reconcilerMap.computeIfAbsent(
+                mode,
+                m -> factory.create(kubernetesClient, flinkService, operatorConfiguration, mode));
+    }
+
+    private Mode inferMode(FlinkDeployment flinkApp) {

Review comment:
       I don't really see the value of having a Mode enum, it just adds unnecessary complexity, this simple if branch could be part of the factory logic, then we wouldnt be restricted to hardcoded set of modes

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/CompositeReconciler.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The composite reconciler which will use the target reconciler based on the app mode. */
+public class CompositeReconciler implements FlinkReconciler {

Review comment:
       I think if we get rid of the mode we dont even need the CompositeReconciler and use the ReconcilerFactory in place of it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819824131



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/** The internal interface of reconciler. */
+public interface FlinkReconciler {
+
+    UpdateControl<FlinkDeployment> reconcile(

Review comment:
       Please add the comment of the interface.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/** The internal interface of reconciler. */
+public interface FlinkReconciler {

Review comment:
       Is it better to naming the interface to `Reconciler`? Because the `BaseReconciler` is the implementation of this interface. cc @gyfora 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#issuecomment-1059301201


   Thanks for your suggestion, I have addressed your comments. Please take a look again @gyfora 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819824335



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/** The internal interface of reconciler. */
+public interface FlinkReconciler {
+
+    UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
+            throws Exception;
+
+    DeleteControl cleanup(

Review comment:
       Ditto.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819673121



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/**
+ * The internal interface of reconciler, It aligns to the {@link
+ * io.javaoperatorsdk.operator.api.reconciler.Reconciler}.
+ */
+public interface FlinkReconciler {
+
+    UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
+            throws Exception;
+
+    default DeleteControl cleanup(

Review comment:
       This aligns to the `io.javaoperatorsdk.operator.api.reconciler.Reconciler`, It provides the `default` cleanup. I will remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819671785



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/CompositeReconciler.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The composite reconciler which will use the target reconciler based on the app mode. */
+public class CompositeReconciler implements FlinkReconciler {

Review comment:
       Get it, I will simplify it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819676433



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/CompositeReconciler.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The composite reconciler which will use the target reconciler based on the app mode. */
+public class CompositeReconciler implements FlinkReconciler {
+
+    private final Map<Mode, FlinkReconciler> reconcilerMap = new ConcurrentHashMap<>();
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final FlinkReconcilerFactory factory;
+
+    public CompositeReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkReconcilerFactory factory) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.factory = factory;
+    }
+
+    @Override
+    public UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
+            throws Exception {
+        Mode mode = inferMode(flinkApp);
+        return getOrCreateReconciler(mode)
+                .reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
+    }
+
+    @Override
+    public DeleteControl cleanup(
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        Mode mode = inferMode(flinkApp);
+        return getOrCreateReconciler(mode).cleanup(operatorNamespace, flinkApp, effectiveConfig);
+    }
+
+    private FlinkReconciler getOrCreateReconciler(Mode mode) {
+        return reconcilerMap.computeIfAbsent(
+                mode,
+                m -> factory.create(kubernetesClient, flinkService, operatorConfiguration, mode));
+    }
+
+    private Mode inferMode(FlinkDeployment flinkApp) {

Review comment:
       It acts the lookup key in the reconcileMap, By this we can choose the target reconciler respect to the `FlinkDeploymentApp` mode. My thought is not have to create `Reconciler` each loop, So i use a KeyMap to store it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819678270



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/CompositeReconciler.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The composite reconciler which will use the target reconciler based on the app mode. */
+public class CompositeReconciler implements FlinkReconciler {
+
+    private final Map<Mode, FlinkReconciler> reconcilerMap = new ConcurrentHashMap<>();
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final FlinkReconcilerFactory factory;
+
+    public CompositeReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkReconcilerFactory factory) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.factory = factory;
+    }
+
+    @Override
+    public UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
+            throws Exception {
+        Mode mode = inferMode(flinkApp);
+        return getOrCreateReconciler(mode)
+                .reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
+    }
+
+    @Override
+    public DeleteControl cleanup(
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        Mode mode = inferMode(flinkApp);
+        return getOrCreateReconciler(mode).cleanup(operatorNamespace, flinkApp, effectiveConfig);
+    }
+
+    private FlinkReconciler getOrCreateReconciler(Mode mode) {
+        return reconcilerMap.computeIfAbsent(
+                mode,
+                m -> factory.create(kubernetesClient, flinkService, operatorConfiguration, mode));
+    }
+
+    private Mode inferMode(FlinkDeployment flinkApp) {

Review comment:
       Basically your CompositeReconciler + factory logic should be The Factory :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819713248



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/**
+ * The internal interface of reconciler, It aligns to the {@link
+ * io.javaoperatorsdk.operator.api.reconciler.Reconciler}.

Review comment:
       This is a bit different from the operator SDK reconciler as in our case we have validator, observer and reconciler so it is only a subset of the responsitiblities.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {

Review comment:
       Since we have only 2 reconcilers and no resource implications, I would prefer to instantiate and populate the map in the constructor.
   
   This way we can avoid any errors after the reconciler has started

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {
+        return reconcilerMap.computeIfAbsent(
+                getMode(flinkApp),
+                mode -> {
+                    switch (mode) {
+                        case SESSION:
+                            return new SessionReconciler(
+                                    kubernetesClient, flinkService, operatorConfiguration);
+                        case JOB:
+                            return new JobReconciler(
+                                    kubernetesClient, flinkService, operatorConfiguration);
+                        default:
+                            throw new UnsupportedOperationException(
+                                    String.format("Unsupported running mode: %s", mode));
+                    }
+                });
+    }
+
+    private Mode getMode(FlinkDeployment flinkApp) {
+        return flinkApp.getSpec().getJob() != null ? Mode.JOB : Mode.SESSION;
+    }
+
+    enum Mode {
+        JOB,

Review comment:
       Instead of JOB we could call it APPLICATION to better represent the deployment mode




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r820000654



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconciler.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/** The internal interface of reconciler. */
+public interface FlinkReconciler {

Review comment:
       +1, I think `Flink` is redundant




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r819723451



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {

Review comment:
       I lean to keep lazy instantiate behavior, If we instantiate and populate the map in the constructor it will lose the meaning of the `ReconcilerFactory`, It will act like the previous with two pre-instantiated reconciler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #41: [FLINK-26377] Extract the Reconciler interface

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #41:
URL: https://github.com/apache/flink-kubernetes-operator/pull/41#discussion_r820117621



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/FlinkReconcilerFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class FlinkReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, FlinkReconciler> reconcilerMap;
+
+    public FlinkReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkReconciler getOrCreate(FlinkDeployment flinkApp) {

Review comment:
       renamed to `ReconcilerFactory`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org