You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/07/02 14:33:36 UTC

[GitHub] [flink] zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r299514891
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 ##########
 @@ -118,75 +120,61 @@ public void shutdown() {
 		// Remove shutdown hook to prevent resource leaks
 		ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
 
-		try {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Shutting down I/O manager.");
-			}
-
-			// close writing and reading threads with best effort and log problems
-			// first notify all to close, then wait until all are closed
 
-			for (WriterThread wt : writers) {
-				try {
-					wt.shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while shutting down IO Manager writer thread.", t);
-				}
-			}
-			for (ReaderThread rt : readers) {
-				try {
-					rt.shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while shutting down IO Manager reader thread.", t);
-				}
-			}
-			try {
-				for (WriterThread wt : writers) {
-					wt.join();
-				}
-				for (ReaderThread rt : readers) {
-					rt.join();
-				}
-			}
-			catch (InterruptedException iex) {
-				// ignore this on shutdown
-			}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Shutting down I/O manager.");
 		}
-		finally {
-			// make sure we call the super implementation in any case and at the last point,
-			// because this will clean up the I/O directories
-			super.shutdown();
+
+		// close writing and reading threads with best effort and log problems
+		// first notify all to close, then wait until all are closed
+
+		List<AutoCloseable> closeables = new ArrayList<>(2 * writers.length + 2 * readers.length + 1);
+
+		for (WriterThread wt : writers) {
+			closeables.add(getWriterThreadCloser(wt));
+			closeables.add(wt::join);
 		}
-	}
-	
-	/**
-	 * Utility method to check whether the IO manager has been properly shut down. The IO manager is considered
-	 * to be properly shut down when it is closed and its threads have ceased operation.
-	 * 
-	 * @return True, if the IO manager has properly shut down, false otherwise.
-	 */
-	@Override
-	public boolean isProperlyShutDown() {
-		boolean readersShutDown = true;
+
 		for (ReaderThread rt : readers) {
-			readersShutDown &= rt.getState() == Thread.State.TERMINATED;
+			closeables.add(getReaderThreadCloser(rt));
+			closeables.add(rt::join);
 
 Review comment:
   join is already called in (Reader/Writer)Thread.shutdown . We likely don't want to wait indefinitely in any case, particularly since we're looping over these and would swallow InterruptedExceptions in `IOUtils#closeAll`..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services