You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/08/20 15:40:51 UTC
[flink] branch master updated: [FLINK-9900][tests] Include more
information on timeout in Zookeeper HA ITCase
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a203351 [FLINK-9900][tests] Include more information on timeout in Zookeeper HA ITCase
a203351 is described below
commit a20335147a17ada546d3ebc269af1d17b973799c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Aug 20 17:40:47 2018 +0200
[FLINK-9900][tests] Include more information on timeout in Zookeeper HA ITCase
---
.../ZooKeeperHighAvailabilityITCase.java | 53 ++++++++++++++++++++--
1 file changed, 50 insertions(+), 3 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index b83f89e..642af40 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -50,6 +50,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -62,6 +63,11 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -163,7 +169,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
* restored successfully
* </ol>
*/
- @Test(timeout = 120_000L)
+ @Test
public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
CheckpointBlockingFunction.successfulRestores.set(0);
@@ -256,13 +262,54 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
() -> clusterClient.getJobStatus(jobID),
Time.milliseconds(50),
deadline,
- (jobStatus) -> jobStatus == JobStatus.FINISHED,
+ JobStatus::isGloballyTerminalState,
TestingUtils.defaultScheduledExecutor());
- assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
+ try {
+ assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
+ } catch (Throwable e) {
+ // include additional debugging information
+ StringWriter error = new StringWriter();
+ try (PrintWriter out = new PrintWriter(error)) {
+ out.println("The job did not finish in time.");
+ out.println("allowedInitializeCallsWithoutRestore= " + CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.get());
+ out.println("illegalRestores= " + CheckpointBlockingFunction.illegalRestores.get());
+ out.println("successfulRestores= " + CheckpointBlockingFunction.successfulRestores.get());
+ out.println("afterMessWithZooKeeper= " + CheckpointBlockingFunction.afterMessWithZooKeeper.get());
+ out.println("failedAlready= " + CheckpointBlockingFunction.failedAlready.get());
+ out.println("currentJobStatus= " + clusterClient.getJobStatus(jobID).get());
+ out.println("numRestarts= " + RestartReporter.numRestarts.getValue());
+ out.println("threadDump= " + generateThreadDump());
+ }
+ throw new AssertionError(error.toString(), ExceptionUtils.stripCompletionException(e));
+ }
assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
}
+ private static String generateThreadDump() {
+ final StringBuilder dump = new StringBuilder();
+ final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+ final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+ for (ThreadInfo threadInfo : threadInfos) {
+ dump.append('"');
+ dump.append(threadInfo.getThreadName());
+ dump.append('"');
+ final Thread.State state = threadInfo.getThreadState();
+ dump.append(System.lineSeparator());
+ dump.append(" java.lang.Thread.State: ");
+ dump.append(state);
+ final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+ for (final StackTraceElement stackTraceElement : stackTraceElements) {
+ dump.append(System.lineSeparator());
+ dump.append(" at ");
+ dump.append(stackTraceElement);
+ }
+ dump.append(System.lineSeparator());
+ dump.append(System.lineSeparator());
+ }
+ return dump.toString();
+ }
+
private static class UnboundedSource implements SourceFunction<String> {
private volatile boolean running = true;