You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/18 07:44:35 UTC

[GitHub] [flink] rmetzger commented on a change in pull request #14678: [FLINK-20833][runtime] Add pluggable failure listener in job manager

rmetzger commented on a change in pull request #14678:
URL: https://github.com/apache/flink/pull/14678#discussion_r559354094



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
##########
@@ -98,11 +103,22 @@ public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable caus
                 true);
     }
 
+    /** @param failureListener the failure listener to be registered */
+    public void registerFailureListener(FailureListener failureListener) {
+        if (!failureListeners.contains(failureListener)) {
+            failureListeners.add(failureListener);

Review comment:
       Isn't HashSet.add() only adding something, if it isn't present already? 

##########
File path: flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.executiongraph.FailureListener
##########
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.executiongraph.DefaultFailureListener

Review comment:
       See comment above.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
##########
@@ -49,6 +51,8 @@
     /** Number of all restarts happened since this job is submitted. */
     private long numberOfRestarts;
 
+    private Set<FailureListener> failureListeners;

Review comment:
       ```suggestion
       private final Set<FailureListener> failureListeners;
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -172,6 +176,14 @@
                         .createInstance(new DefaultExecutionSlotAllocationContext());
 
         this.verticesWaitingForRestart = new HashSet<>();
+
+        List<FailureListener> listeners =
+                failureListenerFactory.createFailureListener(jobManagerJobMetricGroup);
+
+        for (FailureListener listener : listeners) {
+            executionFailureHandler.registerFailureListener(listener);
+        }

Review comment:
       Since this loop executes code not controlled by the framework, I would recommend catching Throwables and returning them as an unrecoverable FailureHandlingResult.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/FailureListenerFactory.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+
+/** Factory class for creating {@link FailureListener} with plugin Manager. */
+public class FailureListenerFactory {
+    private PluginManager pluginManager;
+
+    public FailureListenerFactory(Configuration configuration) {
+        this.pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
+    }
+
+    public List<FailureListener> createFailureListener(JobManagerJobMetricGroup metricGroup) {

Review comment:
       I wonder if we can't do the discovery and initialization of the implementations in the constructor? The available implementations won't change while the process is running, so why re-initializing the listeners over and over again?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/FailureListener.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.core.plugin.Plugin;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+
+/** Failure listener to customize the behavior for each type of failures tracked in job manager. */
+public interface FailureListener extends Plugin {
+
+    /**
+     * Initialize the listener with JobManagerJobMetricGroup.
+     *
+     * @param metricGroup metrics group that the listener can add customized metrics definition.
+     */
+    void init(JobManagerJobMetricGroup metricGroup);
+
+    /**
+     * Method to handle each of failures in the listener.

Review comment:
       ```suggestion
        * Method to handle a failure in the listener.
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/FailureListenerFactory.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+
+/** Factory class for creating {@link FailureListener} with plugin Manager. */
+public class FailureListenerFactory {
+    private PluginManager pluginManager;
+
+    public FailureListenerFactory(Configuration configuration) {
+        this.pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
+    }
+
+    public List<FailureListener> createFailureListener(JobManagerJobMetricGroup metricGroup) {
+        List<FailureListener> failureListeners = new ArrayList<>();
+
+        ServiceLoader<FailureListener> serviceLoader = ServiceLoader.load(FailureListener.class);
+        Iterator<FailureListener> fromServiceLoader = serviceLoader.iterator();
+        Iterator<FailureListener> fromPluginManager = pluginManager.load(FailureListener.class);

Review comment:
       Are you using the service loader just for the default failure listener implementation? Maybe it's nicer to initialize the default implementation(s), by just adding them in code, instead of providing two mechanisms.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/FailureListenerFactory.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+
+/** Factory class for creating {@link FailureListener} with plugin Manager. */
+public class FailureListenerFactory {
+    private PluginManager pluginManager;
+
+    public FailureListenerFactory(Configuration configuration) {
+        this.pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);

Review comment:
       I have not worked with the Flink plugin system yet: Are you sure it's the right approach to initialize the PluginManager here? Basically, the question is: Is there one global plugin manager instance per Flink process, or are there multiple PluginManagers for each pluggable implementation (metrics reporters, file system implementations etc.) ?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
##########
@@ -50,6 +50,7 @@ private MetricNames() {}
     public static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers";
 
     public static final String NUM_RESTARTS = "numRestarts";
+    public static final String NUM_JOB_FAILURE = "numJobFailure";

Review comment:
       I'm not sure if our users will be confused if there are two similar metrics: `numRestarts` and `numJobFailure`.
   What's the exact difference between them?
   
   I guess the number of failures can be higher than the number of restarts (local failures, ignored restarts) ... but I fear a user just looking at the available metrics might pick the first one available.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/plugin/FailureListenerFactoryTest.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.plugin;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.DefaultPluginManager;
+import org.apache.flink.core.plugin.DirectoryBasedPluginFinder;
+import org.apache.flink.core.plugin.PluginDescriptor;
+import org.apache.flink.core.plugin.PluginFinder;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.FailureListener;
+import org.apache.flink.runtime.executiongraph.FailureListenerFactory;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/** Test for {@link org.apache.flink.runtime.executiongraph.FailureListenerFactory}. */
+public class FailureListenerFactoryTest extends PluginTestBase {

Review comment:
       Maybe rename it to `FailureListenerPluginTest`, as your PR is proposing to add two FailureListenerFactoryTests.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/FailureListenerFactory.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+
+/** Factory class for creating {@link FailureListener} with plugin Manager. */
+public class FailureListenerFactory {
+    private PluginManager pluginManager;

Review comment:
       ```suggestion
       private final PluginManager pluginManager;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org