You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/21 23:32:31 UTC
[spark] branch branch-3.0 updated: [SPARK-32034][SQL] Port
HIVE-14817: Shutdown the SessionManager timeoutChecker thread properly upon
shutdown
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c11078b [SPARK-32034][SQL] Port HIVE-14817: Shutdown the SessionManager timeoutChecker thread properly upon shutdown
c11078b is described below
commit c11078b93af09b61e57e4a653d3a33494725cc1d
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Sun Jun 21 16:28:00 2020 -0700
[SPARK-32034][SQL] Port HIVE-14817: Shutdown the SessionManager timeoutChecker thread properly upon shutdown
### What changes were proposed in this pull request?
This PR port https://issues.apache.org/jira/browse/HIVE-14817 for spark thrift server.
### Why are the changes needed?
When stopping the HiveServer2, the non-daemon thread stops the server from terminating
```sql
"HiveServer2-Background-Pool: Thread-79" #79 prio=5 os_prio=31 tid=0x00007fde26138800 nid=0x13713 waiting on condition [0x0000700010c32000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at org.apache.hive.service.cli.session.SessionManager$1.sleepInterval(SessionManager.java:178)
at org.apache.hive.service.cli.session.SessionManager$1.run(SessionManager.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
Here is an example to reproduce:
https://github.com/yaooqinn/kyuubi/blob/master/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/spark/SparkSQLEngineApp.scala
Also, it causes issues as HIVE-14817 described which
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Passing Jenkins
Closes #28870 from yaooqinn/SPARK-32034.
Authored-by: Kent Yao <ya...@hotmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 9f8e15bb2e2189812ee34e3e64baede0d799ba76)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../hive/service/cli/session/SessionManager.java | 32 ++++++++++++++++------
.../hive/service/cli/session/SessionManager.java | 32 ++++++++++++++++------
2 files changed, 48 insertions(+), 16 deletions(-)
diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
index 859f9c8..ad6fb3b 100644
--- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -148,14 +148,20 @@ public class SessionManager extends CompositeService {
}
}
+ private final Object timeoutCheckerLock = new Object();
+
private void startTimeoutChecker() {
final long interval = Math.max(checkInterval, 3000L); // minimum 3 seconds
- Runnable timeoutChecker = new Runnable() {
+ final Runnable timeoutChecker = new Runnable() {
@Override
public void run() {
- for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+ sleepFor(interval);
+ while (!shutdown) {
long current = System.currentTimeMillis();
for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
+ if (shutdown) {
+ break;
+ }
if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current
&& (!checkOperation || session.getNoOperationTime() > sessionTimeout)) {
SessionHandle handle = session.getSessionHandle();
@@ -170,24 +176,34 @@ public class SessionManager extends CompositeService {
session.closeExpiredOperations();
}
}
+ sleepFor(interval);
}
}
- private void sleepInterval(long interval) {
- try {
- Thread.sleep(interval);
- } catch (InterruptedException e) {
- // ignore
+ private void sleepFor(long interval) {
+ synchronized (timeoutCheckerLock) {
+ try {
+ timeoutCheckerLock.wait(interval);
+ } catch (InterruptedException e) {
+ // Ignore, and break.
+ }
}
}
};
backgroundOperationPool.execute(timeoutChecker);
}
+ private void shutdownTimeoutChecker() {
+ shutdown = true;
+ synchronized (timeoutCheckerLock) {
+ timeoutCheckerLock.notify();
+ }
+ }
+
@Override
public synchronized void stop() {
super.stop();
- shutdown = true;
+ shutdownTimeoutChecker();
if (backgroundOperationPool != null) {
backgroundOperationPool.shutdown();
long timeout = hiveConf.getTimeVar(
diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
index 49221b1..5a381d1 100644
--- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -148,14 +148,20 @@ public class SessionManager extends CompositeService {
}
}
+ private final Object timeoutCheckerLock = new Object();
+
private void startTimeoutChecker() {
final long interval = Math.max(checkInterval, 3000L); // minimum 3 seconds
- Runnable timeoutChecker = new Runnable() {
+ final Runnable timeoutChecker = new Runnable() {
@Override
public void run() {
- for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+ sleepFor(interval);
+ while (!shutdown) {
long current = System.currentTimeMillis();
for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
+ if (shutdown) {
+ break;
+ }
if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current
&& (!checkOperation || session.getNoOperationTime() > sessionTimeout)) {
SessionHandle handle = session.getSessionHandle();
@@ -170,24 +176,34 @@ public class SessionManager extends CompositeService {
session.closeExpiredOperations();
}
}
+ sleepFor(interval);
}
}
- private void sleepInterval(long interval) {
- try {
- Thread.sleep(interval);
- } catch (InterruptedException e) {
- // ignore
+ private void sleepFor(long interval) {
+ synchronized (timeoutCheckerLock) {
+ try {
+ timeoutCheckerLock.wait(interval);
+ } catch (InterruptedException e) {
+ // Ignore, and break.
+ }
}
}
};
backgroundOperationPool.execute(timeoutChecker);
}
+ private void shutdownTimeoutChecker() {
+ shutdown = true;
+ synchronized (timeoutCheckerLock) {
+ timeoutCheckerLock.notify();
+ }
+ }
+
@Override
public synchronized void stop() {
super.stop();
- shutdown = true;
+ shutdownTimeoutChecker();
if (backgroundOperationPool != null) {
backgroundOperationPool.shutdown();
long timeout = hiveConf.getTimeVar(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org