You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/02/27 09:01:17 UTC

[flink] branch master updated: [FLINK-16196][hive] FlinkStandaloneHiveRunner leaks HMS process (#11169)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ee0c21e  [FLINK-16196][hive] FlinkStandaloneHiveRunner leaks HMS process (#11169)
ee0c21e is described below

commit ee0c21e13599ebf3b8596ee7255d3eed51af7506
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Feb 27 17:01:01 2020 +0800

    [FLINK-16196][hive] FlinkStandaloneHiveRunner leaks HMS process (#11169)
---
 .../connectors/hive/FlinkStandaloneHiveRunner.java | 81 ++++++++++++----------
 1 file changed, 44 insertions(+), 37 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
index 3485dd6..c9292d4 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
@@ -376,45 +376,52 @@ public class FlinkStandaloneHiveRunner extends BlockJUnit4ClassRunner {
 
 		ProcessBuilder builder = new ProcessBuilder(args);
 		Process process = builder.start();
-		Thread inLogger = new Thread(new LogRedirect(process.getInputStream(), LOGGER));
-		Thread errLogger = new Thread(new LogRedirect(process.getErrorStream(), LOGGER));
-		inLogger.setDaemon(true);
-		inLogger.setName("HMS-IN-Logger");
-		errLogger.setDaemon(true);
-		errLogger.setName("HMS-ERR-Logger");
-		inLogger.start();
-		errLogger.start();
-
-		FutureTask<Void> res = new FutureTask<>(() -> {
-			try {
-				int r = process.waitFor();
-				inLogger.join();
-				errLogger.join();
-				if (r != 0) {
-					throw new RuntimeException("HMS process exited with " + r);
-				}
-			} catch (InterruptedException e) {
-				LOGGER.info("Shutting down HMS");
-			} finally {
-				if (process.isAlive()) {
-					// give it a chance to terminate gracefully
-					process.destroy();
-					try {
-						process.waitFor(5, TimeUnit.SECONDS);
-					} catch (InterruptedException e) {
-						LOGGER.info("Interrupted waiting for HMS to shut down, killing it forcibly");
+
+		try {
+			Thread inLogger = new Thread(new LogRedirect(process.getInputStream(), LOGGER));
+			Thread errLogger = new Thread(new LogRedirect(process.getErrorStream(), LOGGER));
+			inLogger.setDaemon(true);
+			inLogger.setName("HMS-IN-Logger");
+			errLogger.setDaemon(true);
+			errLogger.setName("HMS-ERR-Logger");
+			inLogger.start();
+			errLogger.start();
+
+			FutureTask<Void> res = new FutureTask<>(() -> {
+				try {
+					int r = process.waitFor();
+					inLogger.join();
+					errLogger.join();
+					if (r != 0) {
+						throw new RuntimeException("HMS process exited with " + r);
+					}
+				} catch (InterruptedException e) {
+					LOGGER.info("Shutting down HMS");
+				} finally {
+					if (process.isAlive()) {
+						// give it a chance to terminate gracefully
+						process.destroy();
+						try {
+							process.waitFor(5, TimeUnit.SECONDS);
+						} catch (InterruptedException e) {
+							LOGGER.info("Interrupted waiting for HMS to shut down, killing it forcibly");
+						}
+						process.destroyForcibly();
 					}
-					process.destroyForcibly();
 				}
-			}
-		}, null);
-		Thread thread = new Thread(res);
-		thread.setName("HMS-Watcher");
-		// we need the watcher thread to kill HMS, don't make it daemon
-		thread.setDaemon(false);
-		thread.start();
-		waitForHMSStart(port);
-		return res;
+			}, null);
+			Thread thread = new Thread(res);
+			thread.setName("HMS-Watcher");
+			// we need the watcher thread to kill HMS, don't make it daemon
+			thread.setDaemon(false);
+			thread.start();
+			waitForHMSStart(port);
+			return res;
+		} catch (Throwable e) {
+			// make sure to kill the process in case anything goes wrong
+			process.destroyForcibly();
+			throw e;
+		}
 	}
 
 	private static void waitForHMSStart(int port) throws Exception {