You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/02/04 04:01:24 UTC

[flink] branch release-1.9 updated: [FLINK-15010][network] Add shut down hook to ensure cleanup netty shuffle directories

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new b8221b0  [FLINK-15010][network] Add shut down hook to ensure cleanup netty shuffle directories
b8221b0 is described below

commit b8221b0758a2e0946f9ddd6ca537ce2a6e6a133a
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Tue Feb 4 12:01:07 2020 +0800

    [FLINK-15010][network] Add shut down hook to ensure cleanup netty shuffle directories
    
    When the cluster is shut down in standalone mode, the task manager is shut down via SIG_TERM signal.
    In this case, the shuffle directories created by FileChannelManager would not be removed finally.
    
    To solve this issue, we register the shut down hook before creating directories in the constructor of FileChannelManagerImpl.
    
    This closes #10736
---
 .../runtime/io/disk/FileChannelManagerImpl.java    |  34 ++++
 .../io/disk/FileChannelManagerImplTest.java        | 179 +++++++++++++++++++++
 2 files changed, 213 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
index 2bdb8d9..b6df3dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,10 +32,12 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The manager used for creating/deleting file channels based on config temp dirs.
@@ -51,12 +54,30 @@ public class FileChannelManagerImpl implements FileChannelManager {
 	/** The number of the next path to use. */
 	private volatile int nextPath;
 
+	/** Prefix of the temporary directories to create. */
+	private final String prefix;
+
+	/**
+	 *  Flag to signal that the file channel manager has been shutdown already. The flag
+	 *  should support concurrent access for cases like multiple shutdown hooks.
+	 */
+	private final AtomicBoolean isShutdown = new AtomicBoolean();
+
+	/** Shutdown hook to make sure that the directories are removed on exit. */
+	private final Thread shutdownHook;
+
 	public FileChannelManagerImpl(String[] tempDirs, String prefix) {
 		checkNotNull(tempDirs, "The temporary directories must not be null.");
 		checkArgument(tempDirs.length > 0, "The temporary directories must not be empty.");
 
 		this.random = new Random();
 		this.nextPath = 0;
+		this.prefix = prefix;
+
+		shutdownHook = ShutdownHookUtil.addShutdownHook(this, String.format("%s-%s", getClass().getSimpleName(), prefix), LOG);
+
+		// Creates directories after registering shutdown hook to ensure the directories can be
+		// removed if required.
 		this.paths = createFiles(tempDirs, prefix);
 	}
 
@@ -80,17 +101,23 @@ public class FileChannelManagerImpl implements FileChannelManager {
 
 	@Override
 	public ID createChannel() {
+		checkState(!isShutdown.get(), "File channel manager has shutdown.");
+
 		int num = getNextPathNum();
 		return new ID(paths[num], num, random);
 	}
 
 	@Override
 	public Enumerator createChannelEnumerator() {
+		checkState(!isShutdown.get(), "File channel manager has shutdown.");
+
 		return new Enumerator(paths, random);
 	}
 
 	@Override
 	public File[] getPaths() {
+		checkState(!isShutdown.get(), "File channel manager has shutdown.");
+
 		return Arrays.copyOf(paths, paths.length);
 	}
 
@@ -99,10 +126,17 @@ public class FileChannelManagerImpl implements FileChannelManager {
 	 */
 	@Override
 	public void close() throws Exception {
+		// Marks shutdown and exits if it has already shutdown.
+		if (!isShutdown.compareAndSet(false, true)) {
+			return;
+		}
+
 		IOUtils.closeAll(Arrays.stream(paths)
 			.filter(File::exists)
 			.map(FileChannelManagerImpl::getFileCloser)
 			.collect(Collectors.toList()));
+
+		ShutdownHookUtil.removeShutdownHook(shutdownHook, String.format("%s-%s", getClass().getSimpleName(), prefix), LOG);
 	}
 
 	private static AutoCloseable getFileCloser(File path) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelManagerImplTest.java
new file mode 100644
index 0000000..33f0072
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelManagerImplTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.ShutdownHookUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.time.Duration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests the logic of {@link FileChannelManagerImpl}.
+ */
+public class FileChannelManagerImplTest extends TestLogger {
+	private static final Logger LOG = LoggerFactory.getLogger(FileChannelManagerImplTest.class);
+
+	private static final String DIR_NAME_PREFIX = "manager-test";
+
+	/**
+	 * Marker file indicating the test process is ready to be killed. We could not simply kill the process
+	 * after FileChannelManager has created temporary files since we also need to ensure the caller has
+	 * also registered the shutdown hook if <tt>callerHasHook</tt> is true.
+	 */
+	private static final String SIGNAL_FILE_FOR_KILLING = "could-kill";
+
+	private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10);
+
+	@Rule
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Test
+	public void testDirectoriesCleanupOnKillWithoutCallerHook() throws Exception {
+		testDirectoriesCleanupOnKill(false);
+	}
+
+	@Test
+	public void testDirectoriesCleanupOnKillWithCallerHook() throws Exception {
+		testDirectoriesCleanupOnKill(true);
+	}
+
+	private void testDirectoriesCleanupOnKill(boolean callerHasHook) throws Exception {
+		assumeTrue(OperatingSystem.isLinux()
+				|| OperatingSystem.isFreeBSD()
+				|| OperatingSystem.isSolaris()
+				|| OperatingSystem.isMac());
+
+		File fileChannelDir = temporaryFolder.newFolder();
+		File signalDir = temporaryFolder.newFolder();
+		File signalFile = new File(signalDir.getAbsolutePath(), SIGNAL_FILE_FOR_KILLING);
+
+		FileChannelManagerTestProcess fileChannelManagerTestProcess = new FileChannelManagerTestProcess(
+			callerHasHook,
+			fileChannelDir.getAbsolutePath(),
+			signalFile.getAbsolutePath());
+
+		try {
+			fileChannelManagerTestProcess.startProcess();
+
+			// Waits till the process has created temporary files and registered the corresponding shutdown hooks.
+			TestJvmProcess.waitForMarkerFile(signalFile, TEST_TIMEOUT.toMillis());
+
+			Process kill = Runtime.getRuntime().exec("kill " + fileChannelManagerTestProcess.getProcessId());
+			kill.waitFor();
+			assertEquals("Failed to send SIG_TERM to process", 0, kill.exitValue());
+
+			Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
+			while (fileChannelManagerTestProcess.isAlive() && deadline.hasTimeLeft()) {
+				Thread.sleep(100);
+			}
+
+			assertFalse(
+				"The file channel manager test process does not terminate in time, its output is: \n" +
+					fileChannelManagerTestProcess.getProcessOutput(),
+				fileChannelManagerTestProcess.isAlive());
+
+			// Checks if the directories are cleared.
+			assertFalse(
+				"The file channel manager test process does not remove the tmp shuffle directories after termination, " +
+					"its output is \n" + fileChannelManagerTestProcess.getProcessOutput(),
+				fileOrDirExists(fileChannelDir, DIR_NAME_PREFIX));
+		} finally {
+			fileChannelManagerTestProcess.destroy();
+		}
+	}
+
+	private boolean fileOrDirExists(File rootTmpDir, String namePattern) {
+		File[] candidates = rootTmpDir.listFiles((dir, name) -> name.contains(namePattern));
+		return candidates != null && candidates.length > 0;
+	}
+
+	/**
+	 * The {@link FileChannelManagerCleanupRunner} instance running in a separate JVM process.
+	 */
+	private static class FileChannelManagerTestProcess extends TestJvmProcess {
+		private final boolean callerHasHook;
+		private final String tmpDirectories;
+		private final String signalFilePath;
+
+		FileChannelManagerTestProcess(boolean callerHasHook, String tmpDirectories, String signalFilePath) throws Exception {
+			this.callerHasHook = callerHasHook;
+			this.tmpDirectories = tmpDirectories;
+			this.signalFilePath = signalFilePath;
+		}
+
+		@Override
+		public String getName() {
+			return "File Channel Manager Test";
+		}
+
+		@Override
+		public String[] getJvmArgs() {
+			return new String[]{
+					Boolean.toString(callerHasHook),
+					tmpDirectories,
+					signalFilePath
+			};
+		}
+
+		@Override
+		public String getEntryPointClassName() {
+			return FileChannelManagerCleanupRunner.class.getName();
+		}
+	}
+
+	/**
+	 * The entry point class to test the file channel manager cleanup with shutdown hook.
+	 */
+	public static class FileChannelManagerCleanupRunner {
+
+		public static void main(String[] args) throws Exception{
+			boolean callerHasHook = Boolean.parseBoolean(args[0]);
+			String tmpDirectory = args[1];
+			String signalFilePath = args[2];
+
+			FileChannelManager manager = new FileChannelManagerImpl(new String[]{tmpDirectory}, DIR_NAME_PREFIX);
+
+			if (callerHasHook) {
+				// Verifies the case that both FileChannelManager and its upper component
+				// have registered shutdown hooks, like in IOManager.
+				ShutdownHookUtil.addShutdownHook(() -> manager.close(), "Caller", LOG);
+			}
+
+			// Signals the main process to execute the kill action.
+			new File(signalFilePath).createNewFile();
+
+			// Blocks the process to wait to be killed.
+			Thread.sleep(3 * TEST_TIMEOUT.toMillis());
+		}
+	}
+}