You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/14 11:49:56 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

gyfora opened a new pull request, #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269

   This PR introduces a new interface called FlinkResourceListener that allows operator users to listen to resource status changes and events triggered by the operator itself.
   
   This provides a key integration point for platforms built on top of the operators. Similar to custom validators or metrics reporters, FlinkResourceListener implementations are also loaded using the built in plugin mechanism.
   
   To listeners are integrated into the StatusHelper and the newly introduced EventHelper objects which will seamlessly forward all updates/events without added complexity.
   
   The PR also imprives event triggering and simplifies some test cases that were affected by this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901125430


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for status management and updates. */
+public class EventHelper {

Review Comment:
   We could probably request some JOSDK API support for this feature. What do you think @csviri?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] tweise commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
tweise commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r900884753


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Flink resource listener utilities. */
+public class ListenerUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class);
+
+    private static final String PREFIX = "kubernetes.operator.plugins.listeners.";
+    private static final String SUFFIX = ".class";
+    private static final Pattern PTN =
+            Pattern.compile(Pattern.quote(PREFIX) + "([\\S&&[^.]]*)" + Pattern.quote(SUFFIX));
+    private static final List<String> EXTRA_PARENT_FIRST_PATTERNS =
+            List.of("io.fabric8", "com.fasterxml");
+
+    public static Collection<FlinkResourceListener> discoverListeners(

Review Comment:
   javadoc to summarize how we find listeners?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901131989


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for status management and updates. */
+public class EventHelper {

Review Comment:
   `EventRecorder` and `StatusRecorder` sounds perfect, thanks @morhidi , will make this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901106075


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for status management and updates. */
+public class EventHelper {

Review Comment:
   I am a bit torn here, this class doesn't emit events on its own. So maybe `EventUtil` would be better if we dont like the `helper` name. We also have `StatusHelper` now that could also be renamed to `StatusUtil`. These classes create an interface between the operator and the kubernetes client so we can control these interactions better. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] tweise commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
tweise commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901113817


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for status management and updates. */
+public class EventHelper {

Review Comment:
   Maybe we can look at it from the client perspective rather than how it is implemented? It functions as a kind of "sink" for events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#issuecomment-1159779592

   @tweise @morhidi updated the PR according to the comments:
    - Rename EventHelper -> EventRecorder, StatusHelper -> StatusRecorded
    - Propagate listener errors
    - Add doc page for implementing listeners


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] tweise commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
tweise commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901104055


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for status management and updates. */
+public class EventHelper {

Review Comment:
   `EventHelper` is a bit awkward from a naming perspective, since this is a pluggable component that needs to be injected everywhere. I see that you want to separate the creation of the event from the listener. Would `EventEmitter` be a fit for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] tweise commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
tweise commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901114590


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for status management and updates. */
+public class EventHelper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EventHelper.class);
+
+    private final KubernetesClient client;
+    private final BiConsumer<AbstractFlinkResource<?, ?>, Event> eventListener;
+
+    public EventHelper(
+            KubernetesClient client, BiConsumer<AbstractFlinkResource<?, ?>, Event> eventListener) {
+        this.client = client;
+        this.eventListener = eventListener;
+    }
+
+    public boolean triggerEvent(
+            AbstractFlinkResource<?, ?> resource,
+            EventUtils.Type type,
+            String reason,
+            String message,
+            EventUtils.Component component) {
+        return EventUtils.createOrUpdateEvent(

Review Comment:
   It would also be possible to handle the k8s event create/update as listener implementation, but perhaps that is overkill when the intention is to keep it hard wired?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901132589


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusHelper.java:
##########
@@ -122,4 +144,45 @@ public <T extends CustomResource<?, STATUS>> void removeCachedStatus(T resource)
     protected static Tuple2<String, String> getKey(HasMetadata resource) {
         return Tuple2.of(resource.getMetadata().getNamespace(), resource.getMetadata().getName());
     }
+
+    public static <S extends CommonStatus<?>> StatusHelper<S> create(
+            KubernetesClient kubernetesClient, Collection<FlinkResourceListener> listeners) {
+        BiConsumer<AbstractFlinkResource<?, S>, S> consumer =
+                (resource, previousStatus) -> {
+                    var ctx =
+                            new FlinkResourceListener.StatusUpdateContext() {
+                                @Override
+                                public S getPreviousStatus() {
+                                    return previousStatus;
+                                }
+
+                                @Override
+                                public AbstractFlinkResource<?, S> getFlinkResource() {
+                                    return resource;
+                                }
+
+                                @Override
+                                public KubernetesClient getKubernetesClient() {
+                                    return kubernetesClient;
+                                }
+                            };
+
+                    listeners.forEach(
+                            listener -> {
+                                try {
+                                    if (resource instanceof FlinkDeployment) {
+                                        listener.onDeploymentStatusUpdate(ctx);
+                                    } else {
+                                        listener.onSessionJobStatusUpdate(ctx);
+                                    }
+                                } catch (Exception e) {
+                                    LOG.error(

Review Comment:
   You are probably right, it might be a key integration point, propagating the error makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] csviri commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
csviri commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901287441


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for status management and updates. */
+public class EventHelper {

Review Comment:
   `EventReconder` makes sense for sure, I think `StatusReconder` we discussed before that it should be rather an other resource where the state is recorded (in general). 
   But please create an issue that describes the uses cases and/or motivation for JOSDK! Eventually EventRecorder will be supported for sure. We can discuss the other. 
   
   If you think the `FlinkResourceListener` makes sense to generalize, please describe the motivation behind that too.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for status management and updates. */
+public class EventHelper {

Review Comment:
   `EventReconder` makes sense for sure, I think `StatusReconder` we discussed before that it should be rather an other resource where the state is recorded (in general). 
   But please create an issue that describes the uses cases and/or motivation for JOSDK! Eventually EventRecorder will be supported for sure. We can discuss the others. 
   
   If you think the `FlinkResourceListener` makes sense to generalize, please describe the motivation behind that too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r900951149


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Flink resource listener utilities. */
+public class ListenerUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class);
+
+    private static final String PREFIX = "kubernetes.operator.plugins.listeners.";
+    private static final String SUFFIX = ".class";
+    private static final Pattern PTN =
+            Pattern.compile(Pattern.quote(PREFIX) + "([\\S&&[^.]]*)" + Pattern.quote(SUFFIX));
+    private static final List<String> EXTRA_PARENT_FIRST_PATTERNS =
+            List.of("io.fabric8", "com.fasterxml");
+
+    public static Collection<FlinkResourceListener> discoverListeners(

Review Comment:
   Will do, I will also add a docs page once we agreed on the initial interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901125062


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for status management and updates. */
+public class EventHelper {

Review Comment:
   It is called EventRecorder in kubebuilder:
   https://book-v1.book.kubebuilder.io/beyond_basics/creating_events.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] tweise commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
tweise commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r900884502


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Flink resource listener utilities. */
+public class ListenerUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class);
+
+    private static final String PREFIX = "kubernetes.operator.plugins.listeners.";
+    private static final String SUFFIX = ".class";
+    private static final Pattern PTN =
+            Pattern.compile(Pattern.quote(PREFIX) + "([\\S&&[^.]]*)" + Pattern.quote(SUFFIX));
+    private static final List<String> EXTRA_PARENT_FIRST_PATTERNS =
+            List.of("io.fabric8", "com.fasterxml");
+
+    public static Collection<FlinkResourceListener> discoverListeners(

Review Comment:
   javadoc to summarize how we find listeners?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] tweise commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
tweise commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901111930


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusHelper.java:
##########
@@ -122,4 +144,45 @@ public <T extends CustomResource<?, STATUS>> void removeCachedStatus(T resource)
     protected static Tuple2<String, String> getKey(HasMetadata resource) {
         return Tuple2.of(resource.getMetadata().getNamespace(), resource.getMetadata().getName());
     }
+
+    public static <S extends CommonStatus<?>> StatusHelper<S> create(
+            KubernetesClient kubernetesClient, Collection<FlinkResourceListener> listeners) {
+        BiConsumer<AbstractFlinkResource<?, S>, S> consumer =
+                (resource, previousStatus) -> {
+                    var ctx =
+                            new FlinkResourceListener.StatusUpdateContext() {
+                                @Override
+                                public S getPreviousStatus() {
+                                    return previousStatus;
+                                }
+
+                                @Override
+                                public AbstractFlinkResource<?, S> getFlinkResource() {
+                                    return resource;
+                                }
+
+                                @Override
+                                public KubernetesClient getKubernetesClient() {
+                                    return kubernetesClient;
+                                }
+                            };
+
+                    listeners.forEach(
+                            listener -> {
+                                try {
+                                    if (resource instanceof FlinkDeployment) {
+                                        listener.onDeploymentStatusUpdate(ctx);
+                                    } else {
+                                        listener.onSessionJobStatusUpdate(ctx);
+                                    }
+                                } catch (Exception e) {
+                                    LOG.error(

Review Comment:
   Shouldn't this propagate the exception since it is an error that needs to fixed vs. swallowed? Otherwise WARN would be more appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901135147


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for status management and updates. */
+public class EventHelper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EventHelper.class);
+
+    private final KubernetesClient client;
+    private final BiConsumer<AbstractFlinkResource<?, ?>, Event> eventListener;
+
+    public EventHelper(
+            KubernetesClient client, BiConsumer<AbstractFlinkResource<?, ?>, Event> eventListener) {
+        this.client = client;
+        this.eventListener = eventListener;
+    }
+
+    public boolean triggerEvent(
+            AbstractFlinkResource<?, ?> resource,
+            EventUtils.Type type,
+            String reason,
+            String message,
+            EventUtils.Component component) {
+        return EventUtils.createOrUpdateEvent(

Review Comment:
   I feel that would be slightly weird because the listener is actually listening to the events :) Technically it could be done to create the event and add a hardwired listener that sends it to kubernetes but I don't think it would make the logic easier to understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] tweise commented on pull request #269: [FLINK-27688] Add pluggable FlinkResourceListener interface

Posted by GitBox <gi...@apache.org>.
tweise commented on PR #269:
URL: https://github.com/apache/flink-kubernetes-operator/pull/269#issuecomment-1159739020

   @gyfora overall LGTM. Some details to be discussed. A bit more work on the docs maybe. I could not easily find a description that the listeners will provide a way to emit the events in addition to storing them in k8s, which is key to the k8s native integration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org