You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 12:49:40 UTC

[2/3] flink git commit: [FLINK-8145][tests] fix various IOManagerAsync instances not being shut down

[FLINK-8145][tests] fix various IOManagerAsync instances not being shut down

This closes #5064.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f1744b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f1744b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f1744b

Branch: refs/heads/release-1.4
Commit: b8f1744b54c10968a5a8cabf9d461346ccf1b586
Parents: 0b024aa
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Nov 24 11:31:48 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 13:49:21 2017 +0100

----------------------------------------------------------------------
 .../AsynchronousBufferFileWriterTest.java       |  7 ++
 .../BufferFileWriterFileSegmentReaderTest.java  |  6 ++
 .../iomanager/BufferFileWriterReaderTest.java   |  6 ++
 .../runtime/operators/hash/HashTableITCase.java | 11 +--
 .../hash/HashTablePerformanceComparison.java    |  6 +-
 .../runtime/operators/hash/HashTableTest.java   | 92 ++++++++++----------
 ...bstractSortMergeOuterJoinIteratorITCase.java |  3 -
 .../sort/FixedLengthRecordSorterTest.java       |  5 ++
 .../flink/test/manual/MassiveStringSorting.java | 24 ++++-
 .../test/manual/MassiveStringValueSorting.java  | 24 ++++-
 10 files changed, 117 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8f1744b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 0397de5..40f3e32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestNotificationListener;
+
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,6 +47,11 @@ public class AsynchronousBufferFileWriterTest {
 
 	private AsynchronousBufferFileWriter writer;
 
+	@AfterClass
+	public static void shutdown() {
+		ioManager.shutdown();
+	}
+
 	@Before
 	public void setUp() throws IOException {
 		writer = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue<WriteRequest>());

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f1744b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
index 7fee0fd..0d554c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.apache.flink.runtime.util.event.NotificationListener;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -55,6 +56,11 @@ public class BufferFileWriterFileSegmentReaderTest {
 
 	private LinkedBlockingQueue<FileSegment> returnedFileSegments = new LinkedBlockingQueue<>();
 
+	@AfterClass
+	public static void shutdown() {
+		ioManager.shutdown();
+	}
+
 	@Before
 	public void setUpWriterAndReader() {
 		final FileIOChannel.ID channel = ioManager.createChannel();

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f1744b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index 2da0f7e..31702f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,6 +53,11 @@ public class BufferFileWriterReaderTest {
 
 	private LinkedBlockingQueue<Buffer> returnedBuffers = new LinkedBlockingQueue<>();
 
+	@AfterClass
+	public static void shutdown() {
+		ioManager.shutdown();
+	}
+
 	@Before
 	public void setUpWriterAndReader() {
 		final FileIOChannel.ID channel = ioManager.createChannel();

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f1744b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index f3eac19..a94227c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -806,11 +806,8 @@ public class HashTableITCase extends TestLogger {
 			return;
 		}
 		
-		// create the I/O access for spilling
-		final IOManager ioManager = new IOManagerAsync();
-		
 		// ----------------------------------------------------------------------------------------
-		
+
 		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
 			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, 
 			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
@@ -907,9 +904,6 @@ public class HashTableITCase extends TestLogger {
 			return;
 		}
 		
-		// create the I/O access for spilling
-		IOManager ioManager = new IOManagerAsync();
-		
 		// create the map for validating the results
 		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
 		
@@ -1022,9 +1016,6 @@ public class HashTableITCase extends TestLogger {
 			return;
 		}
 		
-		// create the I/O access for spilling
-		IOManager ioManager = new IOManagerAsync();
-		
 		// create the map for validating the results
 		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f1744b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index 10bac1f..f426a94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.junit.AfterClass;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -54,8 +55,6 @@ public class HashTablePerformanceComparison {
 	
 	private final TypePairComparator<IntPair, IntPair> pairComparator = new IntPairPairComparator();
 	
-	private IOManager ioManager = new IOManagerAsync();
-	
 	@Test
 	public void testCompactingHashMapPerformance() {
 		
@@ -132,6 +131,7 @@ public class HashTablePerformanceComparison {
 	
 	@Test
 	public void testMutableHashMapPerformance() {
+		final IOManager ioManager = new IOManagerAsync();
 		try {
 			final int NUM_MEM_PAGES = SIZE * NUM_PAIRS / PAGE_SIZE;
 			
@@ -207,6 +207,8 @@ public class HashTablePerformanceComparison {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail("Error: " + e.getMessage());
+		} finally {
+			ioManager.shutdown();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f1744b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
index 7c385fc..bcf620c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -205,54 +205,58 @@ public class HashTableTest {
 	public void testSpillingWhenBuildingTableWithoutOverflow() throws Exception {
 		final IOManager ioMan = new IOManagerAsync();
 
-		final TypeSerializer<byte[]> serializer = BytePrimitiveArraySerializer.INSTANCE;
-		final TypeComparator<byte[]> buildComparator = new BytePrimitiveArrayComparator(true);
-		final TypeComparator<byte[]> probeComparator = new BytePrimitiveArrayComparator(true);
+		try {
+			final TypeSerializer<byte[]> serializer = BytePrimitiveArraySerializer.INSTANCE;
+			final TypeComparator<byte[]> buildComparator = new BytePrimitiveArrayComparator(true);
+			final TypeComparator<byte[]> probeComparator = new BytePrimitiveArrayComparator(true);
 
-		@SuppressWarnings("unchecked")
-		final TypePairComparator<byte[], byte[]> pairComparator = new GenericPairComparator<>(
-			new BytePrimitiveArrayComparator(true), new BytePrimitiveArrayComparator(true));
-
-		final int pageSize = 128;
-		final int numSegments = 33;
-
-		List<MemorySegment> memory = getMemory(numSegments, pageSize);
-
-		MutableHashTable<byte[], byte[]> table = new MutableHashTable<byte[], byte[]>(
-			serializer,
-			serializer,
-			buildComparator,
-			probeComparator,
-			pairComparator,
-			memory,
-			ioMan,
-			1,
-			false);
-
-		int numElements = 9;
-
-		table.open(
-			new CombiningIterator<byte[]>(
-				new ByteArrayIterator(numElements, 128,(byte) 0),
-				new ByteArrayIterator(numElements, 128,(byte) 1)),
-			new CombiningIterator<byte[]>(
-				new ByteArrayIterator(1, 128,(byte) 0),
-				new ByteArrayIterator(1, 128,(byte) 1)));
-
-		while(table.nextRecord()) {
-			MutableObjectIterator<byte[]> iterator = table.getBuildSideIterator();
-
-			int counter = 0;
-
-			while(iterator.next() != null) {
-				counter++;
+			@SuppressWarnings("unchecked") final TypePairComparator<byte[], byte[]> pairComparator =
+				new GenericPairComparator<>(
+					new BytePrimitiveArrayComparator(true), new BytePrimitiveArrayComparator(true));
+
+			final int pageSize = 128;
+			final int numSegments = 33;
+
+			List<MemorySegment> memory = getMemory(numSegments, pageSize);
+
+			MutableHashTable<byte[], byte[]> table = new MutableHashTable<byte[], byte[]>(
+				serializer,
+				serializer,
+				buildComparator,
+				probeComparator,
+				pairComparator,
+				memory,
+				ioMan,
+				1,
+				false);
+
+			int numElements = 9;
+
+			table.open(
+				new CombiningIterator<byte[]>(
+					new ByteArrayIterator(numElements, 128, (byte) 0),
+					new ByteArrayIterator(numElements, 128, (byte) 1)),
+				new CombiningIterator<byte[]>(
+					new ByteArrayIterator(1, 128, (byte) 0),
+					new ByteArrayIterator(1, 128, (byte) 1)));
+
+			while (table.nextRecord()) {
+				MutableObjectIterator<byte[]> iterator = table.getBuildSideIterator();
+
+				int counter = 0;
+
+				while (iterator.next() != null) {
+					counter++;
+				}
+
+				// check that we retrieve all our elements
+				Assert.assertEquals(numElements, counter);
 			}
 
-			// check that we retrieve all our elements
-			Assert.assertEquals(numElements, counter);
+			table.close();
+		} finally {
+			ioMan.shutdown();
 		}
-
-		table.close();
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f1744b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
index 28bded2..94c0fd4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -300,9 +300,6 @@ public abstract class AbstractSortMergeOuterJoinIteratorITCase extends TestLogge
 
 		TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator = new GenericPairComparator<>(comparator1, comparator2);
 
-		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManagerAsync();
-
 		final int DUPLICATE_KEY = 13;
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f1744b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index 288e86d..bba713e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -77,6 +77,11 @@ public class FixedLengthRecordSorterTest {
 		if (!this.memoryManager.verifyEmpty()) {
 			Assert.fail("Memory Leak: Some memory has not been returned to the memory manager.");
 		}
+
+		if (this.ioManager != null) {
+			ioManager.shutdown();
+			ioManager = null;
+		}
 		
 		if (this.memoryManager != null) {
 			this.memoryManager.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f1744b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
index c69e6fd..46e4485 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
@@ -82,10 +82,12 @@ public class MassiveStringSorting {
 			UnilateralSortMerger<String> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
+			MemoryManager mm = null;
+			IOManager ioMan = null;
 
 			try {
-				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManagerAsync();
+				mm = new MemoryManager(1024 * 1024, 1);
+				ioMan = new IOManagerAsync();
 
 				TypeSerializer<String> serializer = StringSerializer.INSTANCE;
 				TypeComparator<String> comparator = new StringComparator(true);
@@ -122,6 +124,12 @@ public class MassiveStringSorting {
 				if (sorter != null) {
 					sorter.close();
 				}
+				if (mm != null) {
+					mm.shutdown();
+				}
+				if (ioMan != null) {
+					ioMan.shutdown();
+				}
 			}
 		}
 		catch (Exception e) {
@@ -173,10 +181,12 @@ public class MassiveStringSorting {
 			UnilateralSortMerger<Tuple2<String, String[]>> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
+			MemoryManager mm = null;
+			IOManager ioMan = null;
 
 			try {
-				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManagerAsync();
+				mm = new MemoryManager(1024 * 1024, 1);
+				ioMan = new IOManagerAsync();
 
 				TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>)
 						TypeInfoParser.<Tuple2<String, String[]>>parse("Tuple2<String, String[]>");
@@ -243,6 +253,12 @@ public class MassiveStringSorting {
 				if (sorter != null) {
 					sorter.close();
 				}
+				if (mm != null) {
+					mm.shutdown();
+				}
+				if (ioMan != null) {
+					ioMan.shutdown();
+				}
 			}
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f1744b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
index 453aa14..1f72e4a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
@@ -83,10 +83,12 @@ public class MassiveStringValueSorting {
 			UnilateralSortMerger<StringValue> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
+			MemoryManager mm = null;
+			IOManager ioMan = null;
 
 			try {
-				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManagerAsync();
+				mm = new MemoryManager(1024 * 1024, 1);
+				ioMan = new IOManagerAsync();
 
 				TypeSerializer<StringValue> serializer = new CopyableValueSerializer<StringValue>(StringValue.class);
 				TypeComparator<StringValue> comparator = new CopyableValueComparator<StringValue>(true, StringValue.class);
@@ -124,6 +126,12 @@ public class MassiveStringValueSorting {
 				if (sorter != null) {
 					sorter.close();
 				}
+				if (mm != null) {
+					mm.shutdown();
+				}
+				if (ioMan != null) {
+					ioMan.shutdown();
+				}
 			}
 		}
 		catch (Exception e) {
@@ -177,10 +185,12 @@ public class MassiveStringValueSorting {
 			UnilateralSortMerger<Tuple2<StringValue, StringValue[]>> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
+			MemoryManager mm = null;
+			IOManager ioMan = null;
 
 			try {
-				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManagerAsync();
+				mm = new MemoryManager(1024 * 1024, 1);
+				ioMan = new IOManagerAsync();
 
 				TupleTypeInfo<Tuple2<StringValue, StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>)
 						TypeInfoParser.<Tuple2<StringValue, StringValue[]>>parse("Tuple2<org.apache.flink.types.StringValue, org.apache.flink.types.StringValue[]>");
@@ -247,6 +257,12 @@ public class MassiveStringValueSorting {
 				if (sorter != null) {
 					sorter.close();
 				}
+				if (mm != null) {
+					mm.shutdown();
+				}
+				if (ioMan != null) {
+					ioMan.shutdown();
+				}
 			}
 		}
 		catch (Exception e) {