You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/02/27 09:01:17 UTC
[flink] branch master updated: [FLINK-16196][hive]
FlinkStandaloneHiveRunner leaks HMS process (#11169)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ee0c21e [FLINK-16196][hive] FlinkStandaloneHiveRunner leaks HMS process (#11169)
ee0c21e is described below
commit ee0c21e13599ebf3b8596ee7255d3eed51af7506
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Feb 27 17:01:01 2020 +0800
[FLINK-16196][hive] FlinkStandaloneHiveRunner leaks HMS process (#11169)
---
.../connectors/hive/FlinkStandaloneHiveRunner.java | 81 ++++++++++++----------
1 file changed, 44 insertions(+), 37 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
index 3485dd6..c9292d4 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
@@ -376,45 +376,52 @@ public class FlinkStandaloneHiveRunner extends BlockJUnit4ClassRunner {
ProcessBuilder builder = new ProcessBuilder(args);
Process process = builder.start();
- Thread inLogger = new Thread(new LogRedirect(process.getInputStream(), LOGGER));
- Thread errLogger = new Thread(new LogRedirect(process.getErrorStream(), LOGGER));
- inLogger.setDaemon(true);
- inLogger.setName("HMS-IN-Logger");
- errLogger.setDaemon(true);
- errLogger.setName("HMS-ERR-Logger");
- inLogger.start();
- errLogger.start();
-
- FutureTask<Void> res = new FutureTask<>(() -> {
- try {
- int r = process.waitFor();
- inLogger.join();
- errLogger.join();
- if (r != 0) {
- throw new RuntimeException("HMS process exited with " + r);
- }
- } catch (InterruptedException e) {
- LOGGER.info("Shutting down HMS");
- } finally {
- if (process.isAlive()) {
- // give it a chance to terminate gracefully
- process.destroy();
- try {
- process.waitFor(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.info("Interrupted waiting for HMS to shut down, killing it forcibly");
+
+ try {
+ Thread inLogger = new Thread(new LogRedirect(process.getInputStream(), LOGGER));
+ Thread errLogger = new Thread(new LogRedirect(process.getErrorStream(), LOGGER));
+ inLogger.setDaemon(true);
+ inLogger.setName("HMS-IN-Logger");
+ errLogger.setDaemon(true);
+ errLogger.setName("HMS-ERR-Logger");
+ inLogger.start();
+ errLogger.start();
+
+ FutureTask<Void> res = new FutureTask<>(() -> {
+ try {
+ int r = process.waitFor();
+ inLogger.join();
+ errLogger.join();
+ if (r != 0) {
+ throw new RuntimeException("HMS process exited with " + r);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.info("Shutting down HMS");
+ } finally {
+ if (process.isAlive()) {
+ // give it a chance to terminate gracefully
+ process.destroy();
+ try {
+ process.waitFor(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.info("Interrupted waiting for HMS to shut down, killing it forcibly");
+ }
+ process.destroyForcibly();
}
- process.destroyForcibly();
}
- }
- }, null);
- Thread thread = new Thread(res);
- thread.setName("HMS-Watcher");
- // we need the watcher thread to kill HMS, don't make it daemon
- thread.setDaemon(false);
- thread.start();
- waitForHMSStart(port);
- return res;
+ }, null);
+ Thread thread = new Thread(res);
+ thread.setName("HMS-Watcher");
+ // we need the watcher thread to kill HMS, don't make it daemon
+ thread.setDaemon(false);
+ thread.start();
+ waitForHMSStart(port);
+ return res;
+ } catch (Throwable e) {
+ // make sure to kill the process in case anything goes wrong
+ process.destroyForcibly();
+ throw e;
+ }
}
private static void waitForHMSStart(int port) throws Exception {