You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/31 10:16:39 UTC

[GitHub] zentol closed pull request #6939: [FLINK-10690][tests] Fix Files.list resource leaks

zentol closed pull request #6939: [FLINK-10690][tests] Fix Files.list resource leaks
URL: https://github.com/apache/flink/pull/6939
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index 23878cbccb8..76b78056440 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -38,6 +38,7 @@
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -227,12 +228,18 @@ private static void assertDirEquals(java.nio.file.Path expected, java.nio.file.P
 		assertEquals(expected.getFileName(), actual.getFileName());
 
 		if (Files.isDirectory(expected)) {
-			List<java.nio.file.Path> expectedContents = Files.list(expected)
-				.sorted(Comparator.comparing(java.nio.file.Path::toString))
-				.collect(Collectors.toList());
-			List<java.nio.file.Path> actualContents = Files.list(actual)
-				.sorted(Comparator.comparing(java.nio.file.Path::toString))
-				.collect(Collectors.toList());
+			List<java.nio.file.Path> expectedContents;
+			try (Stream<java.nio.file.Path> files = Files.list(expected)) {
+				expectedContents = files
+					.sorted(Comparator.comparing(java.nio.file.Path::toString))
+					.collect(Collectors.toList());
+			}
+			List<java.nio.file.Path> actualContents;
+			try (Stream<java.nio.file.Path> files = Files.list(actual)) {
+				actualContents = files
+					.sorted(Comparator.comparing(java.nio.file.Path::toString))
+					.collect(Collectors.toList());
+			}
 
 			assertEquals(expectedContents.size(), actualContents.size());
 
diff --git a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
index 167101cc9aa..388cdc6adfa 100644
--- a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
+++ b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
@@ -26,6 +26,7 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through
@@ -47,8 +48,10 @@ public static void main(String[] args) throws Exception {
 		env.registerCachedFile(inputFile.toString(), "test_data", false);
 		env.registerCachedFile(inputDir.toString(), "test_dir", false);
 
-		Path containedFile = Files.list(inputDir).findAny()
-			.orElseThrow(() -> new RuntimeException("Input directory must not be empty."));
+		final Path containedFile;
+		try (Stream<Path> files = Files.list(inputDir)) {
+			containedFile = files.findAny().orElseThrow(() -> new RuntimeException("Input directory must not be empty."));
+		}
 
 		env.fromElements(1)
 			.map(new TestMapFunction(
@@ -96,8 +99,10 @@ public String map(Integer value) throws Exception {
 					"initial dir. Input dir path: %s. Cache dir path: %s", initialDirPath, testDir));
 			}
 
-			if (Files.list(testDir).map(Path::getFileName).map(Path::toString).noneMatch(path -> path.equals(containedFileName))) {
-				throw new RuntimeException(String.format("Cached directory %s should not be empty.", testDir));
+			try (Stream<Path> files = Files.list(testDir)) {
+				if (files.map(Path::getFileName).map(Path::toString).noneMatch(path -> path.equals(containedFileName))) {
+					throw new RuntimeException(String.format("Cached directory %s should not be empty.", testDir));
+				}
 			}
 
 			return Files.readAllLines(testFile)
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
index 1042683c916..2e03197142a 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
@@ -28,8 +28,10 @@
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.UUID;
+import java.util.stream.Stream;
 
 /**
  * Tests for the {@link RefCountedFile}.
@@ -49,7 +51,9 @@ public void releaseToZeroRefCounterShouldDeleteTheFile() throws IOException {
 
 		fileUnderTest.release();
 
-		Assert.assertEquals(0L, Files.list(temporaryFolder.getRoot().toPath()).count());
+		try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) {
+			Assert.assertEquals(0L, files.count());
+		}
 	}
 
 	@Test
@@ -76,7 +80,9 @@ public void retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws IOExcept
 
 		fileUnderTest.release();
 		// the file is deleted now
-		Assert.assertEquals(0L, Files.list(temporaryFolder.getRoot().toPath()).count());
+		try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) {
+			Assert.assertEquals(0L, files.count());
+		}
 	}
 
 	@Test
@@ -111,7 +117,9 @@ public void flushAfterCloseShouldThrowException() throws IOException {
 	// ------------------------------------- Utilities -------------------------------------
 
 	private void verifyTheFileIsStillThere() throws IOException {
-		Assert.assertEquals(1L, Files.list(temporaryFolder.getRoot().toPath()).count());
+		try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) {
+			Assert.assertEquals(1L, files.count());
+		}
 	}
 
 	private RefCountedFile getClosedRefCountedFileWithContent(String content) throws IOException {
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
index 0f46f588264..4a1368a815e 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
@@ -164,7 +164,9 @@ public void cleanupAndCheckTmpCleanup() throws Exception {
 
 		// delete local tmp dir.
 		Assert.assertTrue(Files.exists(localTmpDir));
-		Assert.assertEquals(0L, Files.list(localTmpDir).count());
+		try (Stream<java.nio.file.Path> files = Files.list(localTmpDir)) {
+			Assert.assertEquals(0L, files.count());
+		}
 		Files.delete(localTmpDir);
 
 		// delete also S3 dir.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index c350393371a..22de8a1dcde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -63,10 +63,10 @@
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertArrayEquals;
@@ -182,14 +182,17 @@ public void after() {
 	}
 
 	public void assertUploadDirectoryIsEmpty() throws IOException {
-		Preconditions.checkArgument(
-			1 == Files.list(configuredUploadDir).count(),
-			"Directory structure in rest upload directory has changed. Test must be adjusted");
-		Optional<Path> actualUploadDir = Files.list(configuredUploadDir).findAny();
-		Preconditions.checkArgument(
-			actualUploadDir.isPresent(),
-			"Expected upload directory does not exist.");
-		assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count());
+		Path actualUploadDir;
+		try (Stream<Path> containedFiles = Files.list(configuredUploadDir)) {
+			List<Path> files = containedFiles.collect(Collectors.toList());
+			Preconditions.checkArgument(
+				1 == files.size(),
+				"Directory structure in rest upload directory has changed. Test must be adjusted");
+			actualUploadDir = files.get(0);
+		}
+		try (Stream<Path> containedFiles = Files.list(actualUploadDir)) {
+			assertEquals("Not all files were cleaned up.", 0, containedFiles.count());
+		}
 	}
 
 	/**
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
index 95c357dc4b6..5cbde5d49c7 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
@@ -53,6 +53,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -122,7 +123,10 @@ public void testStopJobAfterSavepoint() throws Exception {
 
 		assertThat(jobStatus, isOneOf(JobStatus.CANCELED, JobStatus.CANCELLING));
 
-		final List<Path> savepoints = Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
+		final List<Path> savepoints;
+		try (Stream<Path> savepointFiles = Files.list(savepointDirectory)) {
+			savepoints = savepointFiles.map(Path::getFileName).collect(Collectors.toList());
+		}
 		assertThat(savepoints, hasItem(Paths.get(savepointLocation).getFileName()));
 	}
 
@@ -136,7 +140,10 @@ public void testStopJobAfterSavepointWithDeactivatedPeriodicCheckpointing() thro
 
 		assertThat(jobStatus, isOneOf(JobStatus.CANCELED, JobStatus.CANCELLING));
 
-		final List<Path> savepoints = Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
+		final List<Path> savepoints;
+		try (Stream<Path> savepointFiles = Files.list(savepointDirectory)) {
+			savepoints = savepointFiles.map(Path::getFileName).collect(Collectors.toList());
+		}
 		assertThat(savepoints, hasItem(Paths.get(savepointLocation).getFileName()));
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 0635f239c56..4a08b7c8305 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -54,6 +54,7 @@
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertNotNull;
 
@@ -326,16 +327,18 @@ private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, J
 	}
 
 	private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException {
-		return Files.list(checkpointDir.toPath().resolve(jobId.toString()))
-			.filter(path -> path.getFileName().toString().startsWith("chk-"))
-			.filter(path -> {
-				try {
-					return Files.list(path).anyMatch(child -> child.getFileName().toString().contains("meta"));
-				} catch (IOException ignored) {
-					return false;
-				}
-			})
-			.findAny();
+		try (Stream<Path> checkpoints = Files.list(checkpointDir.toPath().resolve(jobId.toString()))) {
+			return checkpoints
+				.filter(path -> path.getFileName().toString().startsWith("chk-"))
+				.filter(path -> {
+					try (Stream<Path> checkpointFiles = Files.list(path)) {
+						return checkpointFiles.anyMatch(child -> child.getFileName().toString().contains("meta"));
+					} catch (IOException ignored) {
+						return false;
+					}
+				})
+				.findAny();
+		}
 	}
 
 	private static void waitUntilCanceled(JobID jobId, ClusterClient<?> client) throws ExecutionException, InterruptedException {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 5fc356766f3..b272a2128ff 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -25,6 +25,7 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -41,12 +42,18 @@
 	public void testDeleteApplicationFiles() throws Exception {
 		final Path applicationFilesDir = temporaryFolder.newFolder(".flink").toPath();
 		Files.createFile(applicationFilesDir.resolve("flink.jar"));
-		assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(1L));
-		assertThat(Files.list(applicationFilesDir).count(), equalTo(1L));
+		try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) {
+			assertThat(files.count(), equalTo(1L));
+		}
+		try (Stream<Path> files = Files.list(applicationFilesDir)) {
+			assertThat(files.count(), equalTo(1L));
+		}
 
 		Utils.deleteApplicationFiles(Collections.singletonMap(
 			YarnConfigKeys.FLINK_YARN_FILES,
 			applicationFilesDir.toString()));
-		assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(0L));
+		try (Stream<Path> files = Files.list(temporaryFolder.getRoot().toPath())) {
+			assertThat(files.count(), equalTo(0L));
+		}
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services