You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/10/28 18:04:20 UTC
[hive] branch master updated: HIVE-24270: Move scratchdir cleanup
to background
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f7e9d9b HIVE-24270: Move scratchdir cleanup to background
f7e9d9b is described below
commit f7e9d9b14e9f1fb266aefa9cad73d509d9d614af
Author: Mustafa Iman <mu...@gmail.com>
AuthorDate: Tue Oct 13 14:14:10 2020 -0700
HIVE-24270: Move scratchdir cleanup to background
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +
ql/src/java/org/apache/hadoop/hive/ql/Context.java | 13 +-
.../hadoop/hive/ql/cleanup/CleanupService.java | 38 ++++++
.../hive/ql/cleanup/EventualCleanupService.java | 145 ++++++++++++++++++++
.../hadoop/hive/ql/cleanup/SyncCleanupService.java | 68 +++++++++
.../hadoop/hive/ql/session/SessionState.java | 17 ++-
.../hadoop/hive/ql/cleanup/TestCleanupService.java | 152 +++++++++++++++++++++
.../hive/service/cli/session/HiveSessionImpl.java | 7 +-
.../hive/service/cli/session/SessionManager.java | 18 +++
9 files changed, 451 insertions(+), 12 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index edaa75b..45a44e9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -5238,6 +5238,11 @@ public class HiveConf extends Configuration {
HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS("hive.scheduled.queries.max.executors", 4, new RangeValidator(1, null),
"Maximal number of scheduled query executors to allow."),
+ HIVE_ASYNC_CLEANUP_SERVICE_THREAD_COUNT("hive.async.cleanup.service.thread.count", 10, new RangeValidator(0, null),
+ "Number of threads that run some eventual cleanup operations after queries/sessions close. 0 means cleanup is sync."),
+ HIVE_ASYNC_CLEANUP_SERVICE_QUEUE_SIZE("hive.async.cleanup.service.queue.size", 10000, new RangeValidator(10, Integer.MAX_VALUE),
+ "Size of the async cleanup queue. If cleanup queue is full, cleanup operations become synchronous. " +
+ "Applicable only when number of async cleanup is turned on."),
HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
"If the query results cache is enabled. This will keep results of previously executed queries " +
"to be reused if the same query is executed again."),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index a41c5c8..e4141fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -673,22 +673,21 @@ public class Context {
if(this.fsResultCacheDirs != null) {
resultCacheDir = this.fsResultCacheDirs.toUri().getPath();
}
- for (Map.Entry<String, Path> entry : fsScratchDirs.entrySet()) {
+ SessionState sessionState = SessionState.get();
+ for (Path p: fsScratchDirs.values()) {
try {
- Path p = entry.getValue();
if (p.toUri().getPath().contains(stagingDir) && subDirOf(p, fsScratchDirs.values()) ) {
LOG.debug("Skip deleting stagingDir: " + p);
FileSystem fs = p.getFileSystem(conf);
fs.cancelDeleteOnExit(p);
continue; // staging dir is deleted when deleting the scratch dir
}
- if(resultCacheDir == null || !p.toUri().getPath().contains(resultCacheDir)) {
+ if (resultCacheDir == null || !p.toUri().getPath().contains(resultCacheDir)) {
// delete only the paths which aren't result cache dir path
// because that will be taken care by removeResultCacheDir
- FileSystem fs = p.getFileSystem(conf);
- LOG.debug("Deleting scratch dir: {}", p);
- fs.delete(p, true);
- fs.cancelDeleteOnExit(p);
+ FileSystem fs = p.getFileSystem(conf);
+ LOG.info("Deleting scratch dir: {}", p);
+ sessionState.getCleanupService().deleteRecursive(p, fs);
}
} catch (Exception e) {
LOG.warn("Error Removing Scratch: "
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cleanup/CleanupService.java b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/CleanupService.java
new file mode 100644
index 0000000..919298e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/CleanupService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cleanup;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * A service to remove temporary files at the end of a query/session
+ */
+public interface CleanupService {
+
+ void start();
+
+ void shutdown();
+
+ void shutdownNow();
+
+ boolean deleteRecursive(Path path, FileSystem fileSystem) throws IOException;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cleanup/EventualCleanupService.java b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/EventualCleanupService.java
new file mode 100644
index 0000000..fcfde42
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/EventualCleanupService.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cleanup;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class EventualCleanupService implements CleanupService {
+ private final int threadCount;
+ private final int queueSize;
+ private final ThreadFactory factory;
+ private final Logger LOG = LoggerFactory.getLogger(EventualCleanupService.class.getName());
+ private final AtomicBoolean isRunning = new AtomicBoolean(true);
+ private final BlockingQueue<AsyncDeleteAction> deleteActions;
+ private ExecutorService cleanerExecutorService;
+
+ public EventualCleanupService(int threadCount, int queueSize) {
+ if (queueSize < threadCount) {
+ throw new IllegalArgumentException("Queue size should be greater or equal to thread count. Queue size: "
+ + queueSize + ", thread count: " + threadCount);
+ }
+ this.factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("EventualCleanupService thread %d").build();
+ this.threadCount = threadCount;
+ this.queueSize = queueSize;
+ this.deleteActions = new LinkedBlockingQueue<>(queueSize);
+ }
+
+ @Override
+ public synchronized void start() {
+ if (cleanerExecutorService != null) {
+ LOG.debug("EventualCleanupService is already running.");
+ return;
+ }
+ cleanerExecutorService = Executors.newFixedThreadPool(threadCount, factory);
+ for (int i = 0; i < threadCount; i++) {
+ cleanerExecutorService.submit(new CleanupRunnable());
+ }
+ LOG.info("EventualCleanupService started with {} threads and queue of size {}", threadCount, queueSize);
+ }
+
+ @Override
+ public boolean deleteRecursive(Path path, FileSystem fileSystem) {
+ if (isRunning.get()) {
+ if (deleteActions.offer(new AsyncDeleteAction(path, fileSystem))) {
+ LOG.info("Delete {} operation was queued", path);
+ } else {
+ try {
+ fileSystem.cancelDeleteOnExit(path);
+ fileSystem.delete(path, true);
+ LOG.info("Deleted {} synchronously as the async queue was full", path);
+ } catch (IOException e) {
+ LOG.warn("Error removing path {}: {}", path, e);
+ }
+ }
+
+ return true;
+ } else {
+ LOG.warn("Delete request {} was ignored as cleanup service is shutting down", path);
+ return false;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ isRunning.set(false);
+ cleanerExecutorService.shutdown();
+ }
+
+ @Override
+ public void shutdownNow() {
+ isRunning.set(false);
+ cleanerExecutorService.shutdownNow();
+ }
+
+ @VisibleForTesting
+ public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
+ return cleanerExecutorService.awaitTermination(timeout, timeUnit);
+ }
+
+
+ private static class AsyncDeleteAction {
+ Path path;
+ FileSystem fileSystem;
+
+ public AsyncDeleteAction(Path path, FileSystem fileSystem) {
+ this.path = path;
+ this.fileSystem = fileSystem;
+ }
+ }
+
+ private class CleanupRunnable implements Runnable {
+ @Override
+ public void run() {
+ while (isRunning.get() || deleteActions.size() > 0) {
+ try {
+ AsyncDeleteAction deleteAction = deleteActions.poll(1, TimeUnit.MINUTES);
+ if (deleteAction != null) {
+ Path path = null;
+ try {
+ FileSystem fs = deleteAction.fileSystem;
+ path = deleteAction.path;
+ fs.delete(path, true);
+ fs.cancelDeleteOnExit(path);
+ LOG.info("Deleted {}", path);
+ } catch (IOException e) {
+ LOG.warn("Error removing path {}: {}", path, e);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("PathCleaner was interrupted");
+ }
+ }
+ LOG.info("Cleanup thread shutdown shutdown");
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cleanup/SyncCleanupService.java b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/SyncCleanupService.java
new file mode 100644
index 0000000..597f6a2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/SyncCleanupService.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cleanup;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Dummy cleanup service that just synchronously deletes files.
+ * This is created for use in background threads such as compactor
+ * or scheduled query runners.
+ */
+public class SyncCleanupService implements CleanupService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncCleanupService.class.getName());
+
+ public static SyncCleanupService INSTANCE = new SyncCleanupService();
+
+ private SyncCleanupService() {
+ //no-op
+ }
+
+ @Override
+ public void start() {
+ //no-op
+ }
+
+ @Override
+ public boolean deleteRecursive(Path path, FileSystem fileSystem) throws IOException {
+ fileSystem.cancelDeleteOnExit(path);
+ if (fileSystem.delete(path, true)) {
+ LOG.info("Deleted directory: {} on fs with scheme {}", path, fileSystem.getScheme());
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ //no-op
+ }
+
+ @Override
+ public void shutdownNow() {
+ //no-op
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 1c3537f..658843a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -74,7 +74,9 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.cache.CachedStore;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.cleanup.CleanupService;
import org.apache.hadoop.hive.ql.MapRedStats;
+import org.apache.hadoop.hive.ql.cleanup.SyncCleanupService;
import org.apache.hadoop.hive.ql.exec.AddToClassPathAction;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.Registry;
@@ -320,6 +322,8 @@ public class SessionState implements ISessionAuthState{
private final Registry registry;
+ private final CleanupService cleanupService;
+
/**
* Used to cache functions in use for a query, during query planning
* and is later used for function usage authorization.
@@ -432,6 +436,10 @@ public class SessionState implements ISessionAuthState{
}
public SessionState(HiveConf conf, String userName) {
+ this(conf, userName, SyncCleanupService.INSTANCE);
+ }
+
+ public SessionState(HiveConf conf, String userName, CleanupService cleanupService) {
this.sessionConf = conf;
this.userName = userName;
this.registry = new Registry(false);
@@ -458,6 +466,7 @@ public class SessionState implements ISessionAuthState{
resourceDownloader = new ResourceDownloader(conf,
HiveConf.getVar(conf, ConfVars.DOWNLOADED_RESOURCES_DIR));
killQuery = new NullKillQuery();
+ this.cleanupService = cleanupService;
ShimLoader.getHadoopShims().setHadoopSessionContext(getSessionId());
}
@@ -909,6 +918,10 @@ public class SessionState implements ISessionAuthState{
}
}
+ public CleanupService getCleanupService() {
+ return cleanupService;
+ }
+
private void dropSessionPaths(Configuration conf) throws IOException {
if (hdfsSessionPath != null) {
if (hdfsSessionPathLockFile != null) {
@@ -935,9 +948,7 @@ public class SessionState implements ISessionAuthState{
} else {
fs = path.getFileSystem(conf);
}
- fs.cancelDeleteOnExit(path);
- fs.delete(path, true);
- LOG.info("Deleted directory: {} on fs with scheme {}", path, fs.getScheme());
+ cleanupService.deleteRecursive(path, fs);
} catch (IllegalArgumentException | UnsupportedOperationException | IOException e) {
LOG.error("Failed to delete path at {} on fs with scheme {}", path,
(fs == null ? "Unknown-null" : fs.getScheme()), e);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/cleanup/TestCleanupService.java b/ql/src/test/org/apache/hadoop/hive/ql/cleanup/TestCleanupService.java
new file mode 100644
index 0000000..9538a72
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/cleanup/TestCleanupService.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cleanup;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestCleanupService {
+
+ private static final String TEMP_DIR = TestCleanupService.class.getName() + "-tempdir";
+ private CleanupService cleanupService;
+
+ @After
+ public void tearDown() throws IOException {
+ if (cleanupService != null) {
+ cleanupService.shutdownNow();
+ }
+ Path p = new Path(TEMP_DIR);
+ FileSystem fs = p.getFileSystem(new Configuration());
+ fs.delete(p, true);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEventualCleanupService_throwsWhenMisconfigured() {
+ cleanupService = new EventualCleanupService(10, 4);
+ }
+
+ @Test
+ public void testEventualCleanupService_deletesManyFiles() throws IOException, InterruptedException {
+ testDeleteManyFiles(new EventualCleanupService(4, 1000), 1000);
+ }
+
+ @Test
+ public void testEventualCleanupService_deletesManyFilesWithQueueSize4() throws IOException, InterruptedException {
+ testDeleteManyFiles(new EventualCleanupService(4, 4), 100);
+ }
+
+ @Test
+ public void testSyncCleanupService_deletesManyFiles() throws IOException, InterruptedException {
+ testDeleteManyFiles(SyncCleanupService.INSTANCE, 10);
+ }
+
+ @Test
+ public void testEventualCleanupService_finishesCleanupBeforeExit() throws IOException, InterruptedException {
+ EventualCleanupService cleanupService = new EventualCleanupService(4, 1000);
+ testDeleteManyFiles(cleanupService, 1000, true);
+ assertTrue(cleanupService.await(1, TimeUnit.SECONDS));
+ }
+
+ private void testDeleteManyFiles(CleanupService cleanupService, int n) throws IOException, InterruptedException {
+ testDeleteManyFiles(cleanupService, n, false);
+ }
+
+ private void testDeleteManyFiles(CleanupService cleanupService, int n,
+ boolean shutdownAfterQueueing) throws IOException, InterruptedException {
+ this.cleanupService = cleanupService;
+ Configuration conf = new Configuration();
+ cleanupService.start();
+
+ Collection<Path> files = createManyFiles(n);
+
+ for (Path p: files) {
+ cleanupService.deleteRecursive(p, p.getFileSystem(conf));
+ }
+
+ if (shutdownAfterQueueing) {
+ cleanupService.shutdown();
+ }
+
+ assertTrueEventually(() -> {
+ try {
+ for (Path p : files) {
+ FileSystem fs = p.getFileSystem(conf);
+ assertNotNull(fs);
+ assertFalse(p + " should not exist", fs.exists(p));
+ }
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ });
+ }
+
+ private Path createFile(String name) throws IOException {
+ Path p = new Path(TEMP_DIR, name);
+ FileSystem fs = p.getFileSystem(new Configuration());
+ fs.create(p);
+ return p;
+ }
+
+ private Collection<Path> createManyFiles(int n) throws IOException {
+ Collection<Path> files = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ files.add(createFile("many_" + i));
+ }
+ return files;
+ }
+
+ private void assertTrueEventually(AssertTask assertTask) throws InterruptedException {
+ assertTrueEventually(assertTask, 100000);
+ }
+
+ private void assertTrueEventually(AssertTask assertTask, int timeoutMillis) throws InterruptedException {
+ long endTime = System.currentTimeMillis() + timeoutMillis;
+ AssertionError assertionError = null;
+
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ assertTask.call();
+ return;
+ } catch (AssertionError e) {
+ assertionError = e;
+ sleep(50);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ throw assertionError;
+ }
+
+ private static interface AssertTask {
+ void call() throws AssertionError;
+ }
+}
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index b415a76..82d039b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.processors.SetProcessor;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.FetchOrientation;
@@ -157,7 +156,11 @@ public class HiveSessionImpl implements HiveSession {
* That's why it is important to create SessionState here rather than in the constructor.
*/
public void open(Map<String, String> sessionConfMap) throws HiveSQLException {
- sessionState = new SessionState(sessionConf, username);
+ if (sessionManager != null) {
+ sessionState = new SessionState(sessionConf, username, sessionManager.getCleanupService());
+ } else {
+ sessionState = new SessionState(sessionConf, username);
+ }
sessionState.setUserIpAddress(ipAddress);
sessionState.setIsHiveServerQuery(true);
sessionState.setForwardedAddresses(SessionManager.getForwardedAddresses());
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index ae2c17b..11f8efd 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -45,7 +45,10 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.cleanup.SyncCleanupService;
import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.cleanup.CleanupService;
+import org.apache.hadoop.hive.ql.cleanup.EventualCleanupService;
import org.apache.hadoop.hive.ql.hooks.HookUtils;
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.cli.HiveSQLException;
@@ -101,6 +104,7 @@ public class SessionManager extends CompositeService {
private final HiveServer2 hiveServer2;
private String sessionImplWithUGIclassName;
private String sessionImplclassName;
+ private CleanupService cleanupService;
public SessionManager(HiveServer2 hiveServer2, boolean allowSessions) {
super(SessionManager.class.getSimpleName());
@@ -135,6 +139,15 @@ public class SessionManager extends CompositeService {
userIpAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS);
LOG.info("Connections limit are user: {} ipaddress: {} user-ipaddress: {}", userLimit, ipAddressLimit,
userIpAddressLimit);
+
+ int cleanupThreadCount = hiveConf.getIntVar(ConfVars.HIVE_ASYNC_CLEANUP_SERVICE_THREAD_COUNT);
+ int cleanupQueueSize = hiveConf.getIntVar(ConfVars.HIVE_ASYNC_CLEANUP_SERVICE_QUEUE_SIZE);
+ if (cleanupThreadCount > 0) {
+ cleanupService = new EventualCleanupService(cleanupThreadCount, cleanupQueueSize);
+ } else {
+ cleanupService = SyncCleanupService.INSTANCE;
+ }
+ cleanupService.start();
super.init(hiveConf);
}
@@ -279,6 +292,10 @@ public class SessionManager extends CompositeService {
}
}
+ public CleanupService getCleanupService() {
+ return cleanupService;
+ }
+
private final Object timeoutCheckerLock = new Object();
private void startTimeoutChecker() {
@@ -343,6 +360,7 @@ public class SessionManager extends CompositeService {
shutdownTimeoutChecker();
if (backgroundOperationPool != null) {
backgroundOperationPool.shutdown();
+ cleanupService.shutdown();
long timeout = hiveConf.getTimeVar(
ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
try {