You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/08/20 15:40:51 UTC

[flink] branch master updated: [FLINK-9900][tests] Include more information on timeout in Zookeeper HA ITCase

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a203351  [FLINK-9900][tests] Include more information on timeout in Zookeeper HA ITCase
a203351 is described below

commit a20335147a17ada546d3ebc269af1d17b973799c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Aug 20 17:40:47 2018 +0200

    [FLINK-9900][tests] Include more information on timeout in Zookeeper HA ITCase
---
 .../ZooKeeperHighAvailabilityITCase.java           | 53 ++++++++++++++++++++--
 1 file changed, 50 insertions(+), 3 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index b83f89e..642af40 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -50,6 +50,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -62,6 +63,11 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -163,7 +169,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 	 *       restored successfully
 	 * </ol>
 	 */
-	@Test(timeout = 120_000L)
+	@Test
 	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
 		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
 		CheckpointBlockingFunction.successfulRestores.set(0);
@@ -256,13 +262,54 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 			() -> clusterClient.getJobStatus(jobID),
 			Time.milliseconds(50),
 			deadline,
-			(jobStatus) -> jobStatus == JobStatus.FINISHED,
+			JobStatus::isGloballyTerminalState,
 			TestingUtils.defaultScheduledExecutor());
-		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
+		try {
+			assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
+		} catch (Throwable e) {
+			// include additional debugging information
+			StringWriter error = new StringWriter();
+			try (PrintWriter out = new PrintWriter(error)) {
+				out.println("The job did not finish in time.");
+				out.println("allowedInitializeCallsWithoutRestore= " + CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.get());
+				out.println("illegalRestores= " + CheckpointBlockingFunction.illegalRestores.get());
+				out.println("successfulRestores= " + CheckpointBlockingFunction.successfulRestores.get());
+				out.println("afterMessWithZooKeeper= " + CheckpointBlockingFunction.afterMessWithZooKeeper.get());
+				out.println("failedAlready= " + CheckpointBlockingFunction.failedAlready.get());
+				out.println("currentJobStatus= " + clusterClient.getJobStatus(jobID).get());
+				out.println("numRestarts= " + RestartReporter.numRestarts.getValue());
+				out.println("threadDump= " + generateThreadDump());
+			}
+			throw new AssertionError(error.toString(), ExceptionUtils.stripCompletionException(e));
+		}
 
 		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
 	}
 
+	private static String generateThreadDump() {
+		final StringBuilder dump = new StringBuilder();
+		final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+		final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+		for (ThreadInfo threadInfo : threadInfos) {
+			dump.append('"');
+			dump.append(threadInfo.getThreadName());
+			dump.append('"');
+			final Thread.State state = threadInfo.getThreadState();
+			dump.append(System.lineSeparator());
+			dump.append("   java.lang.Thread.State: ");
+			dump.append(state);
+			final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+			for (final StackTraceElement stackTraceElement : stackTraceElements) {
+				dump.append(System.lineSeparator());
+				dump.append("        at ");
+				dump.append(stackTraceElement);
+			}
+			dump.append(System.lineSeparator());
+			dump.append(System.lineSeparator());
+		}
+		return dump.toString();
+	}
+
 	private static class UnboundedSource implements SourceFunction<String> {
 		private volatile boolean running = true;