You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/29 13:00:53 UTC

[flink] branch release-1.9 updated: [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 0e9f463  [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest
0e9f463 is described below

commit 0e9f463668378bd7469194ebf0af76e3c125f0d7
Author: Yu Li <li...@apache.org>
AuthorDate: Fri Jul 26 10:27:28 2019 +0200

    [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest
    
    Currently test cases will fail when trying to close the output stream if all data written
    but ClosedByInterruptException occurs at the ending phase. This commit fixes it.
    
    This closes #9235
---
 .../core/fs/AbstractRecoverableWriterTest.java     | 49 ++++++++++++++++++----
 1 file changed, 41 insertions(+), 8 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
index ab37a07..de9b095 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -111,7 +112,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 
 		final Path path = new Path(testDir, "part-0");
 
-		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+		RecoverableFsDataOutputStream stream = null;
+		try {
+			stream = writer.open(path);
 			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 			stream.closeForCommit().commit();
 
@@ -119,6 +122,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 				Assert.assertEquals("part-0", fileContents.getKey().getName());
 				Assert.assertEquals(testData1, fileContents.getValue());
 			}
+		} finally {
+			IOUtils.closeQuietly(stream);
 		}
 	}
 
@@ -130,7 +135,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 
 		final Path path = new Path(testDir, "part-0");
 
-		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+		RecoverableFsDataOutputStream stream = null;
+		try {
+			stream = writer.open(path);
 			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 			stream.persist();
 
@@ -141,6 +148,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 				Assert.assertEquals("part-0", fileContents.getKey().getName());
 				Assert.assertEquals(testData1 + testData2, fileContents.getValue());
 			}
+		} finally {
+			IOUtils.closeQuietly(stream);
 		}
 	}
 
@@ -194,7 +203,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 		final RecoverableWriter initWriter = getNewFileSystemWriter();
 
 		final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4);
-		try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+		RecoverableFsDataOutputStream stream = null;
+		try {
+			stream = initWriter.open(path);
 			recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
 
 			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
@@ -206,6 +217,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
 			recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
+		} finally {
+			IOUtils.closeQuietly(stream);
 		}
 
 		final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer();
@@ -217,7 +230,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 		final RecoverableWriter.ResumeRecoverable recoveredRecoverable =
 				deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
 
-		try (final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable)) {
+		RecoverableFsDataOutputStream recoveredStream = null;
+		try {
+			recoveredStream = newWriter.recover(recoveredRecoverable);
 
 			// we expect the data to be truncated
 			Map<Path, String> files = getFileContentByPath(testDir);
@@ -238,6 +253,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 				Assert.assertEquals("part-0", fileContents.getKey().getName());
 				Assert.assertEquals(expectedFinalContents, fileContents.getValue());
 			}
+		} finally {
+			IOUtils.closeQuietly(recoveredStream);
 		}
 	}
 
@@ -249,7 +266,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 		final RecoverableWriter initWriter = getNewFileSystemWriter();
 
 		final RecoverableWriter.CommitRecoverable recoverable;
-		try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
+		RecoverableFsDataOutputStream stream = null;
+		try {
+			stream = initWriter.open(path);
 			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
 			stream.persist();
@@ -259,6 +278,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
 			recoverable = stream.closeForCommit().getRecoverable();
+		} finally {
+			IOUtils.closeQuietly(stream);
 		}
 
 		final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable);
@@ -289,12 +310,16 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 		final RecoverableWriter writer = getNewFileSystemWriter();
 		final Path path = new Path(testDir, "part-0");
 
-		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+		RecoverableFsDataOutputStream stream = null;
+		try {
+			stream = writer.open(path);
 			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
 			stream.closeForCommit().getRecoverable();
 			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 			fail();
+		} finally {
+			IOUtils.closeQuietly(stream);
 		}
 	}
 
@@ -306,13 +331,17 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 		final Path path = new Path(testDir, "part-0");
 
 		RecoverableWriter.ResumeRecoverable recoverable;
-		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+		RecoverableFsDataOutputStream stream = null;
+		try {
+			stream = writer.open(path);
 			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
 			recoverable = stream.persist();
 			stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
 			stream.closeForCommit().commit();
+		} finally {
+			IOUtils.closeQuietly(stream);
 		}
 
 		// this should throw an exception as the file is already committed
@@ -332,7 +361,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 
 		final RecoverableWriter.ResumeRecoverable recoverable1;
 		final RecoverableWriter.ResumeRecoverable recoverable2;
-		try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
+		RecoverableFsDataOutputStream stream = null;
+		try {
+			stream = writer.open(path);
 			stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
 			recoverable1 = stream.persist();
@@ -340,6 +371,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
 
 			recoverable2 = stream.persist();
 			stream.write(testData3.getBytes(StandardCharsets.UTF_8));
+		} finally {
+			IOUtils.closeQuietly(stream);
 		}
 
 		try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable1)) {