You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/07/01 08:26:22 UTC

[GitHub] [flink] azagrebin commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

azagrebin commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r298927259
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 ##########
 @@ -126,22 +131,20 @@ public void close() {
 			// close writing and reading threads with best effort and log problems
 			// first notify all to close, then wait until all are closed
 
-			for (WriterThread wt : writers) {
-				try {
-					wt.shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while shutting down IO Manager writer thread.", t);
-				}
-			}
-			for (ReaderThread rt : readers) {
-				try {
-					rt.shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while shutting down IO Manager reader thread.", t);
-				}
-			}
+			Iterable<AutoCloseable> writerThreadCloseables = Arrays.stream(writers)
+				.filter(Objects::nonNull)
+				.filter(Thread::isAlive)
+				.map(IOManagerAsync::getWriterThreadCloser)
+				.collect(Collectors.toList());
+
+			Iterable<AutoCloseable> readerThreadCloseables = Arrays.stream(readers)
+				.filter(Objects::nonNull)
+				.filter(Thread::isAlive)
+				.map(IOManagerAsync::getReaderThreadCloser)
+				.collect(Collectors.toList());
+
+			IOUtils.closeAll(writerThreadCloseables, readerThreadCloseables);
 
 Review comment:
   Instead of modifying `IOUtils.closeAll` method, I would suggest the following approach:
   ```
           // close writing and reading threads with best effort and log problems
   		// first notify all to close, then wait until all are closed
   
   		List<AutoCloseable> closables = new ArrayList<>(2 * writers.length + 2 * readers.length + 1);
   
   		Arrays.stream(writers)
   			.filter(Objects::nonNull)
   			.filter(Thread::isAlive)
   			.map(IOManagerAsync::getWriterThreadCloser)
   			.forEach(closables::add);
   
   		Arrays.stream(readers)
   			.filter(Objects::nonNull)
   			.filter(Thread::isAlive)
   			.map(IOManagerAsync::getReaderThreadCloser)
   			.forEach(closables::add);
   
   		for (WriterThread wt : writers) {
   			closables.add(wt::join);
   		}
   
   		for (ReaderThread rt : readers) {
   			closables.add(rt::join);
   		}
   
   		// make sure we call the super implementation in any case and at the last point,
   		// because this will clean up the I/O directories
   		closables.add(super::close);
   
   		IOUtils.closeAll(closables);
   ```
   This way we also try to close everything

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services