You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/04/04 16:02:18 UTC

flink git commit: [FLINK-3571] [tests] Add debug output to SavepointITCase

Repository: flink
Updated Branches:
  refs/heads/master c7595840f -> 21480e29a


[FLINK-3571] [tests] Add debug output to SavepointITCase

- If savepoint triggering fails, we only get a ClassCastException,
  which is not helpful for debugging (see [1])
- Adds more output in case of unexpected responses

[1] https://issues.apache.org/jira/browse/FLINK-3571


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21480e29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21480e29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21480e29

Branch: refs/heads/master
Commit: 21480e29a10ada9ae25e11731d0295ce8dcc5665
Parents: c759584
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Apr 4 15:55:42 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Apr 4 15:59:51 2016 +0200

----------------------------------------------------------------------
 .../flink/test/checkpointing/SavepointITCase.java    | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21480e29/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index b4ad0fa..106f7fe 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle;
@@ -804,9 +805,17 @@ public class SavepointITCase extends TestLogger {
 			Future<Object> savepointPathFuture = jobManager.ask(
 					new TriggerSavepoint(jobGraph.getJobID()), deadline.timeLeft());
 
-			final String savepointPath = ((TriggerSavepointSuccess) Await
-					.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
-			LOG.info("Retrieved savepoint path: " + savepointPath + ".");
+			Object resp = Await.result(savepointPathFuture, deadline.timeLeft());
+
+			String savepointPath = null;
+			if (resp instanceof TriggerSavepointSuccess) {
+				savepointPath = ((TriggerSavepointSuccess) resp).savepointPath();
+				LOG.info("Retrieved savepoint path: " + savepointPath + ".");
+			} else if (resp instanceof TriggerSavepointFailure) {
+				fail("Received TriggerSavepointFailure: " + ((TriggerSavepointFailure) resp).cause().getMessage());
+			} else {
+				fail("Unexpected response of type  " + resp.getClass() + " " + resp);
+			}
 
 			// Completed checkpoint
 			RestoreStateCountingAndFailingSource.checkpointCompleteLatch.await();