You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/03/04 17:47:02 UTC
[flink] branch release-1.10 updated: [FLINK-15838] Dangling
CountDownLatch.await(timeout)
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new d710a8c [FLINK-15838] Dangling CountDownLatch.await(timeout)
d710a8c is described below
commit d710a8cc995737f1fcab2912312834a3b799c288
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Tue Feb 4 00:56:55 2020 +0530
[FLINK-15838] Dangling CountDownLatch.await(timeout)
This closes #11005.
---
.../org/apache/flink/core/fs/AbstractCloseableRegistryTest.java | 2 +-
.../flink/runtime/webmonitor/history/HistoryServerTest.java | 8 +++++---
.../flink/runtime/io/network/buffer/AbstractByteBufTest.java | 6 +++---
.../flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java | 8 +++++---
.../flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java | 3 ++-
.../java/org/apache/flink/test/checkpointing/RescalingITCase.java | 5 +++--
.../java/org/apache/flink/test/checkpointing/SavepointITCase.java | 6 +++---
7 files changed, 22 insertions(+), 16 deletions(-)
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index d8d639e..ab14f77 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -240,7 +240,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
* Causes the current thread to wait until {@link #close()} is called.
*/
public void awaitClose(final long timeout, final TimeUnit timeUnit) throws InterruptedException {
- closeCalledLatch.await(timeout, timeUnit);
+ assertTrue(closeCalledLatch.await(timeout, timeUnit));
}
}
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 9a5391a..71e8307 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -66,6 +66,8 @@ import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertTrue;
+
/**
* Tests for the HistoryServer.
*/
@@ -142,7 +144,7 @@ public class HistoryServerTest extends TestLogger {
try {
hs.start();
String baseUrl = "http://localhost:" + hs.getWebPort();
- numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS);
+ assertTrue(numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS));
Assert.assertEquals(numJobs + numLegacyJobs, getJobsOverview(baseUrl).getJobs().size());
@@ -193,7 +195,7 @@ public class HistoryServerTest extends TestLogger {
try {
hs.start();
String baseUrl = "http://localhost:" + hs.getWebPort();
- numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS);
+ assertTrue(numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS));
Collection<JobDetails> jobs = getJobsOverview(baseUrl).getJobs();
Assert.assertEquals(numJobs, jobs.size());
@@ -207,7 +209,7 @@ public class HistoryServerTest extends TestLogger {
// delete one archive from jm
Files.deleteIfExists(jmDirectory.toPath().resolve(jobIdToDelete));
- numExpectedExpiredJobs.await(10L, TimeUnit.SECONDS);
+ assertTrue(numExpectedExpiredJobs.await(10L, TimeUnit.SECONDS));
// check that archive is present in hs
Collection<JobDetails> jobsAfterDeletion = getJobsOverview(baseUrl).getJobs();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java
index 4cca279..19fd597 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java
@@ -2402,7 +2402,7 @@ public abstract class AbstractByteBufTest extends TestLogger {
}
}).start();
}
- latch.await(10, TimeUnit.SECONDS);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
barrier.await(5, TimeUnit.SECONDS);
buffer.release();
}
@@ -2457,7 +2457,7 @@ public abstract class AbstractByteBufTest extends TestLogger {
}
}).start();
}
- latch.await(10, TimeUnit.SECONDS);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
barrier.await(5, TimeUnit.SECONDS);
buffer.release();
}
@@ -2512,7 +2512,7 @@ public abstract class AbstractByteBufTest extends TestLogger {
}
}).start();
}
- latch.await(10, TimeUnit.SECONDS);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
barrier.await(5, TimeUnit.SECONDS);
assertNull(cause.get());
buffer.release();
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index c95cc61..7821afc 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -182,8 +182,10 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
}
// wait until we restart at least 2 times and until we see at least 10 checkpoints.
- numberOfRestarts.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- checkpointsToWaitFor.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertTrue(numberOfRestarts.await(deadline.timeLeft().toMillis(),
+ TimeUnit.MILLISECONDS));
+ assertTrue(checkpointsToWaitFor.await(deadline.timeLeft().toMillis(),
+ TimeUnit.MILLISECONDS));
// verifying that we actually received a synchronous checkpoint
assertTrue(syncSavepointId.get() > 0);
@@ -287,7 +289,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
null));
ClientUtils.submitJob(clusterClient, jobGraph);
- invokeLatch.await(60, TimeUnit.SECONDS);
+ assertTrue(invokeLatch.await(60, TimeUnit.SECONDS));
waitForJob();
}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index 2fb3a93..85cacb8 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -62,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.isOneOf;
+import static org.junit.Assert.assertTrue;
/**
* Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}.
@@ -114,7 +115,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
null));
ClientUtils.submitJob(clusterClient, jobGraph);
- invokeLatch.await(60, TimeUnit.SECONDS);
+ assertTrue(invokeLatch.await(60, TimeUnit.SECONDS));
waitForJob();
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 735f795..80d0cb5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -82,6 +82,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Test savepoint rescaling.
@@ -193,7 +194,7 @@ public class RescalingITCase extends TestLogger {
ClientUtils.submitJob(client, jobGraph);
// wait til the sources have emitted numberElements for each key and completed a checkpoint
- SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertTrue(SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// verify the current state
@@ -337,7 +338,7 @@ public class RescalingITCase extends TestLogger {
ClientUtils.submitJob(client, jobGraph);
// wait til the sources have emitted numberElements for each key and completed a checkpoint
- SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertTrue(SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// verify the current state
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index c89c095..16a76fe 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -436,7 +436,7 @@ public class SavepointITCase extends TestLogger {
JobID jobID = submissionResult.getJobID();
// wait for the Tasks to be ready
- StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertTrue(StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
savepointPath = client.triggerSavepoint(jobID, null).get();
LOG.info("Retrieved savepoint: " + savepointPath + ".");
@@ -484,10 +484,10 @@ public class SavepointITCase extends TestLogger {
// Submit the job
ClientUtils.submitJob(client, modifiedJobGraph);
// Await state is restored
- StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertTrue(StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// Await some progress after restore
- StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertTrue(StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
} finally {
cluster.after();
}