You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/03/04 17:47:02 UTC

[flink] branch release-1.10 updated: [FLINK-15838] Dangling CountDownLatch.await(timeout)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new d710a8c  [FLINK-15838] Dangling CountDownLatch.await(timeout)
d710a8c is described below

commit d710a8cc995737f1fcab2912312834a3b799c288
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Tue Feb 4 00:56:55 2020 +0530

    [FLINK-15838] Dangling CountDownLatch.await(timeout)
    
    This closes #11005.
---
 .../org/apache/flink/core/fs/AbstractCloseableRegistryTest.java   | 2 +-
 .../flink/runtime/webmonitor/history/HistoryServerTest.java       | 8 +++++---
 .../flink/runtime/io/network/buffer/AbstractByteBufTest.java      | 6 +++---
 .../flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java     | 8 +++++---
 .../flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java  | 3 ++-
 .../java/org/apache/flink/test/checkpointing/RescalingITCase.java | 5 +++--
 .../java/org/apache/flink/test/checkpointing/SavepointITCase.java | 6 +++---
 7 files changed, 22 insertions(+), 16 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index d8d639e..ab14f77 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -240,7 +240,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 		 * Causes the current thread to wait until {@link #close()} is called.
 		 */
 		public void awaitClose(final long timeout, final TimeUnit timeUnit) throws InterruptedException {
-			closeCalledLatch.await(timeout, timeUnit);
+			assertTrue(closeCalledLatch.await(timeout, timeUnit));
 		}
 	}
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 9a5391a..71e8307 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -66,6 +66,8 @@ import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests for the HistoryServer.
  */
@@ -142,7 +144,7 @@ public class HistoryServerTest extends TestLogger {
 		try {
 			hs.start();
 			String baseUrl = "http://localhost:" + hs.getWebPort();
-			numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS);
+			assertTrue(numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS));
 
 			Assert.assertEquals(numJobs + numLegacyJobs, getJobsOverview(baseUrl).getJobs().size());
 
@@ -193,7 +195,7 @@ public class HistoryServerTest extends TestLogger {
 		try {
 			hs.start();
 			String baseUrl = "http://localhost:" + hs.getWebPort();
-			numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS);
+			assertTrue(numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS));
 
 			Collection<JobDetails> jobs = getJobsOverview(baseUrl).getJobs();
 			Assert.assertEquals(numJobs, jobs.size());
@@ -207,7 +209,7 @@ public class HistoryServerTest extends TestLogger {
 			// delete one archive from jm
 			Files.deleteIfExists(jmDirectory.toPath().resolve(jobIdToDelete));
 
-			numExpectedExpiredJobs.await(10L, TimeUnit.SECONDS);
+			assertTrue(numExpectedExpiredJobs.await(10L, TimeUnit.SECONDS));
 
 			// check that archive is present in hs
 			Collection<JobDetails> jobsAfterDeletion = getJobsOverview(baseUrl).getJobs();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java
index 4cca279..19fd597 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java
@@ -2402,7 +2402,7 @@ public abstract class AbstractByteBufTest extends TestLogger {
                 }
             }).start();
         }
-        latch.await(10, TimeUnit.SECONDS);
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
         barrier.await(5, TimeUnit.SECONDS);
         buffer.release();
     }
@@ -2457,7 +2457,7 @@ public abstract class AbstractByteBufTest extends TestLogger {
                 }
             }).start();
         }
-        latch.await(10, TimeUnit.SECONDS);
+		assertTrue(latch.await(10, TimeUnit.SECONDS));
         barrier.await(5, TimeUnit.SECONDS);
         buffer.release();
     }
@@ -2512,7 +2512,7 @@ public abstract class AbstractByteBufTest extends TestLogger {
                 }
             }).start();
         }
-        latch.await(10, TimeUnit.SECONDS);
+		assertTrue(latch.await(10, TimeUnit.SECONDS));
         barrier.await(5, TimeUnit.SECONDS);
         assertNull(cause.get());
         buffer.release();
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index c95cc61..7821afc 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -182,8 +182,10 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 		}
 
 		// wait until we restart at least 2 times and until we see at least 10 checkpoints.
-		numberOfRestarts.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-		checkpointsToWaitFor.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		assertTrue(numberOfRestarts.await(deadline.timeLeft().toMillis(),
+			TimeUnit.MILLISECONDS));
+		assertTrue(checkpointsToWaitFor.await(deadline.timeLeft().toMillis(),
+			TimeUnit.MILLISECONDS));
 
 		// verifying that we actually received a synchronous checkpoint
 		assertTrue(syncSavepointId.get() > 0);
@@ -287,7 +289,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 				null));
 
 		ClientUtils.submitJob(clusterClient, jobGraph);
-		invokeLatch.await(60, TimeUnit.SECONDS);
+		assertTrue(invokeLatch.await(60, TimeUnit.SECONDS));
 		waitForJob();
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index 2fb3a93..85cacb8 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -62,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.isOneOf;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}.
@@ -114,7 +115,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
 			null));
 
 		ClientUtils.submitJob(clusterClient, jobGraph);
-		invokeLatch.await(60, TimeUnit.SECONDS);
+		assertTrue(invokeLatch.await(60, TimeUnit.SECONDS));
 		waitForJob();
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 735f795..80d0cb5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -82,6 +82,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test savepoint rescaling.
@@ -193,7 +194,7 @@ public class RescalingITCase extends TestLogger {
 			ClientUtils.submitJob(client, jobGraph);
 
 			// wait til the sources have emitted numberElements for each key and completed a checkpoint
-			SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertTrue(SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 
 			// verify the current state
 
@@ -337,7 +338,7 @@ public class RescalingITCase extends TestLogger {
 			ClientUtils.submitJob(client, jobGraph);
 
 			// wait til the sources have emitted numberElements for each key and completed a checkpoint
-			SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertTrue(SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 
 			// verify the current state
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index c89c095..16a76fe 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -436,7 +436,7 @@ public class SavepointITCase extends TestLogger {
 			JobID jobID = submissionResult.getJobID();
 
 			// wait for the Tasks to be ready
-			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertTrue(StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 
 			savepointPath = client.triggerSavepoint(jobID, null).get();
 			LOG.info("Retrieved savepoint: " + savepointPath + ".");
@@ -484,10 +484,10 @@ public class SavepointITCase extends TestLogger {
 			// Submit the job
 			ClientUtils.submitJob(client, modifiedJobGraph);
 			// Await state is restored
-			StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertTrue(StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 
 			// Await some progress after restore
-			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertTrue(StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 		} finally {
 			cluster.after();
 		}