You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/04/04 16:02:18 UTC
flink git commit: [FLINK-3571] [tests] Add debug output to
SavepointITCase
Repository: flink
Updated Branches:
refs/heads/master c7595840f -> 21480e29a
[FLINK-3571] [tests] Add debug output to SavepointITCase
- If savepoint triggering fails, we only get a ClassCastException,
which is not helpful for debugging (see [1])
- Adds more output in case of unexpected responses
[1] https://issues.apache.org/jira/browse/FLINK-3571
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21480e29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21480e29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21480e29
Branch: refs/heads/master
Commit: 21480e29a10ada9ae25e11731d0295ce8dcc5665
Parents: c759584
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Apr 4 15:55:42 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Apr 4 15:59:51 2016 +0200
----------------------------------------------------------------------
.../flink/test/checkpointing/SavepointITCase.java | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21480e29/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index b4ad0fa..106f7fe 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle;
@@ -804,9 +805,17 @@ public class SavepointITCase extends TestLogger {
Future<Object> savepointPathFuture = jobManager.ask(
new TriggerSavepoint(jobGraph.getJobID()), deadline.timeLeft());
- final String savepointPath = ((TriggerSavepointSuccess) Await
- .result(savepointPathFuture, deadline.timeLeft())).savepointPath();
- LOG.info("Retrieved savepoint path: " + savepointPath + ".");
+ Object resp = Await.result(savepointPathFuture, deadline.timeLeft());
+
+ String savepointPath = null;
+ if (resp instanceof TriggerSavepointSuccess) {
+ savepointPath = ((TriggerSavepointSuccess) resp).savepointPath();
+ LOG.info("Retrieved savepoint path: " + savepointPath + ".");
+ } else if (resp instanceof TriggerSavepointFailure) {
+ fail("Received TriggerSavepointFailure: " + ((TriggerSavepointFailure) resp).cause().getMessage());
+ } else {
+ fail("Unexpected response of type " + resp.getClass() + " " + resp);
+ }
// Completed checkpoint
RestoreStateCountingAndFailingSource.checkpointCompleteLatch.await();