You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/19 19:00:07 UTC

[GitHub] [flink] rkhachatryan commented on a change in pull request #14662: [FLINK-20675][checkpointing] Ensure asynchronous checkpoint failure could fail the job by default

rkhachatryan commented on a change in pull request #14662:
URL: https://github.com/apache/flink/pull/14662#discussion_r560372765



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
##########
@@ -34,22 +34,18 @@
 
     private static final long serialVersionUID = 2094094662279578953L;
 
-    /** The reason why the checkpoint was declined. */
-    @Nullable private final SerializedThrowable reason;
-
-    public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
-        this(job, taskExecutionId, checkpointId, null);
-    }
+    /** The serialized reason why the checkpoint was declined. */
+    @Nonnull private final SerializedCheckpointException serializedCheckpointException;
 
     public DeclineCheckpoint(
             JobID job,
             ExecutionAttemptID taskExecutionId,
             long checkpointId,
-            @Nullable Throwable reason) {
+            @Nonnull CheckpointException checkpointException) {
         super(job, taskExecutionId, checkpointId);
 
-        // some other exception. replace with a serialized throwable, to be on the safe side
-        this.reason = reason == null ? null : new SerializedThrowable(reason);
+        // replace with a serialized throwable, to be on the safe side
+        this.serializedCheckpointException = new SerializedCheckpointException(checkpointException);

Review comment:
       I see that `checkpointException` is dereferenced further but can we add an explicit non-null check for extra safety?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/SerializedCheckpointException.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.util.SerializedThrowable;
+
+import java.io.Serializable;
+
+/**
+ * Serialized checkpoint exception which wraps the checkpoint failure reason and its serialized
+ * throwable.
+ */
+public class SerializedCheckpointException implements Serializable {

Review comment:
       Could you explain why just `SerializedThrowable` built from `CheckpointException` is not enough?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -237,19 +241,29 @@ private void handleExecutionException(Exception e) {
                                         + '.',
                                 e);
 
-                // We only report the exception for the original cause of fail and cleanup.
-                // Otherwise this followup exception could race the original exception in failing
-                // the task.
-                try {
-                    taskEnvironment.declineCheckpoint(
+                if (isTaskRunning.get()) {
+                    // We only report the exception for the original cause of fail and cleanup.
+                    // Otherwise this followup exception could race the original exception in
+                    // failing the task.
+                    try {
+                        taskEnvironment.declineCheckpoint(
+                                checkpointMetaData.getCheckpointId(),
+                                new CheckpointException(
+                                        CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION,
+                                        checkpointException));
+                    } catch (Exception unhandled) {
+                        AsynchronousException asyncException = new AsynchronousException(unhandled);
+                        asyncExceptionHandler.handleAsyncException(
+                                "Failure in asynchronous checkpoint materialization",
+                                asyncException);
+                    }
+                } else {
+                    // We never decline checkpoint after task is not running to avoid unexpected job
+                    // failover, which caused by exceeding checkpoint tolerable failure threshold.
+                    LOG.warn(

Review comment:
       Could you explain the connection of this change to the original motivation?
   
   What will happen to such checkpoint? WIll it timeout?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
##########
@@ -34,22 +34,18 @@
 
     private static final long serialVersionUID = 2094094662279578953L;
 
-    /** The reason why the checkpoint was declined. */
-    @Nullable private final SerializedThrowable reason;
-
-    public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
-        this(job, taskExecutionId, checkpointId, null);
-    }
+    /** The serialized reason why the checkpoint was declined. */
+    @Nonnull private final SerializedCheckpointException serializedCheckpointException;

Review comment:
       nit: I think we can assume not null by defult

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.mock;
+
+/** Tests to verify end-to-end logic of checkpoint failure manager. */
+public class CheckpointFailureManagerITCase extends TestLogger {
+    private static MiniClusterWithClientResource cluster;
+
+    @Before
+    public void setup() throws Exception {
+        Configuration configuration = new Configuration();
+
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configuration)
+                                .build());
+        cluster.before();
+    }
+
+    @AfterClass
+    public static void shutDownExistingCluster() {
+        if (cluster != null) {
+            cluster.after();
+            cluster = null;
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testAsyncCheckpointFailureTriggerJobFailed() throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setStateBackend(new AsyncFailureStateBackend());
+        env.addSource(new StringGeneratingSourceFunction()).addSink(new DiscardingSink<>());
+        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+        try {
+            // assert that the job only execute checkpoint once and only failed once.
+            TestUtils.submitJobAndWaitForResult(
+                    cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
+        } catch (JobExecutionException jobException) {
+            if (!jobException
+                    .getCause()
+                    .getCause()
+                    .equals(

Review comment:
       I think it would be more robust to use `ExceptionUtils.findThrowable` , rethrow it and then check exception message or type.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
##########
@@ -69,6 +66,6 @@ public SerializedThrowable getReason() {
     public String toString() {
         return String.format(
                 "Declined Checkpoint %d for (%s/%s): %s",
-                getCheckpointId(), getJob(), getTaskExecutionId(), reason);
+                getCheckpointId(), getJob(), getTaskExecutionId(), serializedCheckpointException);

Review comment:
       `SerializedCheckpointException` doesn't override `toString`.
   Maybe `serializedCheckpointException.getCheckpointFailureReason()`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/SerializedCheckpointException.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.util.SerializedThrowable;
+
+import java.io.Serializable;
+
+/**
+ * Serialized checkpoint exception which wraps the checkpoint failure reason and its serialized
+ * throwable.
+ */
+public class SerializedCheckpointException implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final CheckpointFailureReason checkpointFailureReason;
+    private final SerializedThrowable serializedThrowable;

Review comment:
       I couldn't find usages of this field. It's unclear to me who is responsible for calling `deserializeError`. IIUC, it's not usable before this call.
   Maybe just drop the field?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org