You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/12 17:35:54 UTC

[GitHub] asfgit closed pull request #7064: [FLINK-10753] Improve propagation and logging of snapshot exceptions

asfgit closed pull request #7064: [FLINK-10753] Improve propagation and logging of snapshot exceptions
URL: https://github.com/apache/flink/pull/7064
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 57337b6286f..02b6fa4a2bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1249,11 +1249,14 @@ private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Th
 
 		final long checkpointId = pendingCheckpoint.getCheckpointId();
 
-		final String reason = (cause != null) ? cause.getMessage() : "";
+		LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause);
 
-		LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason);
+		if (cause != null) {
+			pendingCheckpoint.abortError(cause);
+		} else {
+			pendingCheckpoint.abortDeclined();
+		}
 
-		pendingCheckpoint.abortDeclined();
 		rememberRecentCheckpointId(checkpointId);
 
 		// we don't have to schedule another "dissolving" checkpoint any more because the
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1b51ac4bf8d..1bc6b0e4baa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -34,6 +34,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -433,25 +434,23 @@ public void abortSubsumed() {
 		}
 	}
 
+
 	public void abortDeclined() {
-		try {
-			Exception cause = new Exception("Checkpoint was declined (tasks not ready)");
-			onCompletionPromise.completeExceptionally(cause);
-			reportFailedCheckpoint(cause);
-		} finally {
-			dispose(true);
-		}
+		abortWithCause(new Exception("Checkpoint was declined (tasks not ready)"));
 	}
 
 	/**
 	 * Aborts the pending checkpoint due to an error.
 	 * @param cause The error's exception.
 	 */
-	public void abortError(Throwable cause) {
+	public void abortError(@Nonnull Throwable cause) {
+		abortWithCause(new Exception("Checkpoint failed: " + cause.getMessage(), cause));
+	}
+
+	private void abortWithCause(@Nonnull Exception cause) {
 		try {
-			Exception failure = new Exception("Checkpoint failed: " + cause.getMessage(), cause);
-			onCompletionPromise.completeExceptionally(failure);
-			reportFailedCheckpoint(failure);
+			onCompletionPromise.completeExceptionally(cause);
+			reportFailedCheckpoint(cause);
 		} finally {
 			dispose(true);
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index aba8bda1918..918fa50483d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -26,8 +26,13 @@
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class RpcCheckpointResponder implements CheckpointResponder {
 
+	private static final Logger LOG = LoggerFactory.getLogger(RpcCheckpointResponder.class);
+
 	private final CheckpointCoordinatorGateway checkpointCoordinatorGateway;
 
 	public RpcCheckpointResponder(CheckpointCoordinatorGateway checkpointCoordinatorGateway) {
@@ -57,6 +62,7 @@ public void declineCheckpoint(
 			long checkpointId, 
 			Throwable cause) {
 
+		LOG.info("Declining checkpoint {} of job {}.", checkpointId, jobID, cause);
 		checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpointId, cause);
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a63a7971679..4967cb9ead9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -413,8 +413,10 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
 				snapshotException.addSuppressed(e);
 			}
 
-			throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +
-				getOperatorName() + '.', snapshotException);
+			String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
+				getOperatorName() + ".";
+
+			throw new Exception(snapshotFailMessage, snapshotException);
 		}
 
 		return snapshotInProgress;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 3db0f62f829..097616feb96 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -142,6 +142,7 @@ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClien
 			} catch (Exception e) {
 				String exceptionString = ExceptionUtils.stringifyException(e);
 				if (!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") // legacy
+						|| exceptionString.matches("(.*\n)*.*was not running(.*\n)*")
 						|| exceptionString.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*") // new
 						|| exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*"))) { // new
 					throw e;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services