You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/10/28 18:04:20 UTC

[hive] branch master updated: HIVE-24270: Move scratchdir cleanup to background

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f7e9d9b  HIVE-24270: Move scratchdir cleanup to background
f7e9d9b is described below

commit f7e9d9b14e9f1fb266aefa9cad73d509d9d614af
Author: Mustafa Iman <mu...@gmail.com>
AuthorDate: Tue Oct 13 14:14:10 2020 -0700

    HIVE-24270: Move scratchdir cleanup to background
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   5 +
 ql/src/java/org/apache/hadoop/hive/ql/Context.java |  13 +-
 .../hadoop/hive/ql/cleanup/CleanupService.java     |  38 ++++++
 .../hive/ql/cleanup/EventualCleanupService.java    | 145 ++++++++++++++++++++
 .../hadoop/hive/ql/cleanup/SyncCleanupService.java |  68 +++++++++
 .../hadoop/hive/ql/session/SessionState.java       |  17 ++-
 .../hadoop/hive/ql/cleanup/TestCleanupService.java | 152 +++++++++++++++++++++
 .../hive/service/cli/session/HiveSessionImpl.java  |   7 +-
 .../hive/service/cli/session/SessionManager.java   |  18 +++
 9 files changed, 451 insertions(+), 12 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index edaa75b..45a44e9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -5238,6 +5238,11 @@ public class HiveConf extends Configuration {
     HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS("hive.scheduled.queries.max.executors", 4, new RangeValidator(1, null),
         "Maximal number of scheduled query executors to allow."),
 
+    HIVE_ASYNC_CLEANUP_SERVICE_THREAD_COUNT("hive.async.cleanup.service.thread.count", 10, new RangeValidator(0, null),
+        "Number of threads that run some eventual cleanup operations after queries/sessions close. 0 means cleanup is sync."),
+    HIVE_ASYNC_CLEANUP_SERVICE_QUEUE_SIZE("hive.async.cleanup.service.queue.size", 10000, new RangeValidator(10, Integer.MAX_VALUE),
+        "Size of the async cleanup queue. If cleanup queue is full, cleanup operations become synchronous. " +
+            "Applicable only when number of async cleanup is turned on."),
     HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
         "If the query results cache is enabled. This will keep results of previously executed queries " +
         "to be reused if the same query is executed again."),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index a41c5c8..e4141fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -673,22 +673,21 @@ public class Context {
     if(this.fsResultCacheDirs != null) {
       resultCacheDir = this.fsResultCacheDirs.toUri().getPath();
     }
-    for (Map.Entry<String, Path> entry : fsScratchDirs.entrySet()) {
+    SessionState sessionState = SessionState.get();
+    for (Path p: fsScratchDirs.values()) {
       try {
-        Path p = entry.getValue();
         if (p.toUri().getPath().contains(stagingDir) && subDirOf(p, fsScratchDirs.values())  ) {
           LOG.debug("Skip deleting stagingDir: " + p);
           FileSystem fs = p.getFileSystem(conf);
           fs.cancelDeleteOnExit(p);
           continue; // staging dir is deleted when deleting the scratch dir
         }
-        if(resultCacheDir == null || !p.toUri().getPath().contains(resultCacheDir)) {
+        if (resultCacheDir == null || !p.toUri().getPath().contains(resultCacheDir)) {
           // delete only the paths which aren't result cache dir path
           // because that will be taken care by removeResultCacheDir
-        FileSystem fs = p.getFileSystem(conf);
-        LOG.debug("Deleting scratch dir: {}",  p);
-        fs.delete(p, true);
-        fs.cancelDeleteOnExit(p);
+          FileSystem fs = p.getFileSystem(conf);
+          LOG.info("Deleting scratch dir: {}", p);
+          sessionState.getCleanupService().deleteRecursive(p, fs);
         }
       } catch (Exception e) {
         LOG.warn("Error Removing Scratch: "
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cleanup/CleanupService.java b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/CleanupService.java
new file mode 100644
index 0000000..919298e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/CleanupService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cleanup;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * A service to remove temporary files at the end of a query/session
+ */
+public interface CleanupService {
+
+  void start();
+
+  void shutdown();
+
+  void shutdownNow();
+
+  boolean deleteRecursive(Path path, FileSystem fileSystem) throws IOException;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cleanup/EventualCleanupService.java b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/EventualCleanupService.java
new file mode 100644
index 0000000..fcfde42
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/EventualCleanupService.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cleanup;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class EventualCleanupService implements CleanupService {
+  private final int threadCount;
+  private final int queueSize;
+  private final ThreadFactory factory;
+  private final Logger LOG = LoggerFactory.getLogger(EventualCleanupService.class.getName());
+  private final AtomicBoolean isRunning = new AtomicBoolean(true);
+  private final BlockingQueue<AsyncDeleteAction> deleteActions;
+  private ExecutorService cleanerExecutorService;
+
+  public EventualCleanupService(int threadCount, int queueSize) {
+    if (queueSize < threadCount) {
+      throw new IllegalArgumentException("Queue size should be greater or equal to thread count. Queue size: "
+          + queueSize + ", thread count: " + threadCount);
+    }
+    this.factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("EventualCleanupService thread %d").build();
+    this.threadCount = threadCount;
+    this.queueSize = queueSize;
+    this.deleteActions = new LinkedBlockingQueue<>(queueSize);
+  }
+
+  @Override
+  public synchronized void start() {
+    if (cleanerExecutorService != null) {
+      LOG.debug("EventualCleanupService is already running.");
+      return;
+    }
+    cleanerExecutorService = Executors.newFixedThreadPool(threadCount, factory);
+    for (int i = 0; i < threadCount; i++) {
+      cleanerExecutorService.submit(new CleanupRunnable());
+    }
+    LOG.info("EventualCleanupService started with {} threads and queue of size {}", threadCount, queueSize);
+  }
+
+  @Override
+  public boolean deleteRecursive(Path path, FileSystem fileSystem) {
+    if (isRunning.get()) {
+      if (deleteActions.offer(new AsyncDeleteAction(path, fileSystem))) {
+        LOG.info("Delete {} operation was queued", path);
+      } else {
+        try {
+          fileSystem.cancelDeleteOnExit(path);
+          fileSystem.delete(path, true);
+          LOG.info("Deleted {} synchronously as the async queue was full", path);
+        } catch (IOException e) {
+          LOG.warn("Error removing path {}: {}", path, e);
+        }
+      }
+
+      return true;
+    } else {
+      LOG.warn("Delete request {} was ignored as cleanup service is shutting down", path);
+      return false;
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    isRunning.set(false);
+    cleanerExecutorService.shutdown();
+  }
+
+  @Override
+  public void shutdownNow() {
+    isRunning.set(false);
+    cleanerExecutorService.shutdownNow();
+  }
+
+  @VisibleForTesting
+  public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
+    return cleanerExecutorService.awaitTermination(timeout, timeUnit);
+  }
+
+
+  private static class AsyncDeleteAction {
+    Path path;
+    FileSystem fileSystem;
+
+    public AsyncDeleteAction(Path path, FileSystem fileSystem) {
+      this.path = path;
+      this.fileSystem = fileSystem;
+    }
+  }
+
+  private class CleanupRunnable implements Runnable {
+    @Override
+    public void run() {
+      while (isRunning.get() || deleteActions.size() > 0) {
+        try {
+          AsyncDeleteAction deleteAction = deleteActions.poll(1, TimeUnit.MINUTES);
+          if (deleteAction != null) {
+            Path path = null;
+            try {
+              FileSystem fs = deleteAction.fileSystem;
+              path = deleteAction.path;
+              fs.delete(path, true);
+              fs.cancelDeleteOnExit(path);
+              LOG.info("Deleted {}", path);
+            } catch (IOException e) {
+              LOG.warn("Error removing path {}: {}", path, e);
+            }
+          }
+        } catch (InterruptedException e) {
+          LOG.debug("PathCleaner was interrupted");
+        }
+      }
+      LOG.info("Cleanup thread shutdown shutdown");
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cleanup/SyncCleanupService.java b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/SyncCleanupService.java
new file mode 100644
index 0000000..597f6a2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/SyncCleanupService.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cleanup;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Dummy cleanup service that just synchronously deletes files.
+ * This is created for use in background threads such as compactor
+ * or scheduled query runners.
+ */
+public class SyncCleanupService implements CleanupService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SyncCleanupService.class.getName());
+
+  public static SyncCleanupService INSTANCE = new SyncCleanupService();
+
+  private SyncCleanupService() {
+    //no-op
+  }
+
+  @Override
+  public void start() {
+    //no-op
+  }
+
+  @Override
+  public boolean deleteRecursive(Path path, FileSystem fileSystem) throws IOException {
+    fileSystem.cancelDeleteOnExit(path);
+    if (fileSystem.delete(path, true)) {
+      LOG.info("Deleted directory: {} on fs with scheme {}", path, fileSystem.getScheme());
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    //no-op
+  }
+
+  @Override
+  public void shutdownNow() {
+    //no-op
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 1c3537f..658843a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -74,7 +74,9 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.cache.CachedStore;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.cleanup.CleanupService;
 import org.apache.hadoop.hive.ql.MapRedStats;
+import org.apache.hadoop.hive.ql.cleanup.SyncCleanupService;
 import org.apache.hadoop.hive.ql.exec.AddToClassPathAction;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo;
 import org.apache.hadoop.hive.ql.exec.Registry;
@@ -320,6 +322,8 @@ public class SessionState implements ISessionAuthState{
 
   private final Registry registry;
 
+  private final CleanupService cleanupService;
+
   /**
    * Used to cache functions in use for a query, during query planning
    * and is later used for function usage authorization.
@@ -432,6 +436,10 @@ public class SessionState implements ISessionAuthState{
   }
 
   public SessionState(HiveConf conf, String userName) {
+    this(conf, userName, SyncCleanupService.INSTANCE);
+  }
+
+  public SessionState(HiveConf conf, String userName, CleanupService cleanupService) {
     this.sessionConf = conf;
     this.userName = userName;
     this.registry = new Registry(false);
@@ -458,6 +466,7 @@ public class SessionState implements ISessionAuthState{
     resourceDownloader = new ResourceDownloader(conf,
         HiveConf.getVar(conf, ConfVars.DOWNLOADED_RESOURCES_DIR));
     killQuery = new NullKillQuery();
+    this.cleanupService = cleanupService;
 
     ShimLoader.getHadoopShims().setHadoopSessionContext(getSessionId());
   }
@@ -909,6 +918,10 @@ public class SessionState implements ISessionAuthState{
     }
   }
 
+  public CleanupService getCleanupService() {
+    return cleanupService;
+  }
+
   private void dropSessionPaths(Configuration conf) throws IOException {
     if (hdfsSessionPath != null) {
       if (hdfsSessionPathLockFile != null) {
@@ -935,9 +948,7 @@ public class SessionState implements ISessionAuthState{
       } else {
         fs = path.getFileSystem(conf);
       }
-      fs.cancelDeleteOnExit(path);
-      fs.delete(path, true);
-      LOG.info("Deleted directory: {} on fs with scheme {}", path, fs.getScheme());
+      cleanupService.deleteRecursive(path, fs);
     } catch (IllegalArgumentException | UnsupportedOperationException | IOException e) {
       LOG.error("Failed to delete path at {} on fs with scheme {}", path,
           (fs == null ? "Unknown-null" : fs.getScheme()), e);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/cleanup/TestCleanupService.java b/ql/src/test/org/apache/hadoop/hive/ql/cleanup/TestCleanupService.java
new file mode 100644
index 0000000..9538a72
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/cleanup/TestCleanupService.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cleanup;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestCleanupService {
+
+  private static final String TEMP_DIR = TestCleanupService.class.getName() + "-tempdir";
+  private CleanupService cleanupService;
+
+  @After
+  public void tearDown() throws IOException {
+    if (cleanupService != null) {
+      cleanupService.shutdownNow();
+    }
+    Path p = new Path(TEMP_DIR);
+    FileSystem fs = p.getFileSystem(new Configuration());
+    fs.delete(p, true);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEventualCleanupService_throwsWhenMisconfigured() {
+    cleanupService = new EventualCleanupService(10, 4);
+  }
+
+  @Test
+  public void testEventualCleanupService_deletesManyFiles() throws IOException, InterruptedException {
+    testDeleteManyFiles(new EventualCleanupService(4, 1000), 1000);
+  }
+
+  @Test
+  public void testEventualCleanupService_deletesManyFilesWithQueueSize4() throws IOException, InterruptedException {
+    testDeleteManyFiles(new EventualCleanupService(4, 4), 100);
+  }
+
+  @Test
+  public void testSyncCleanupService_deletesManyFiles() throws IOException, InterruptedException {
+    testDeleteManyFiles(SyncCleanupService.INSTANCE, 10);
+  }
+
+  @Test
+  public void testEventualCleanupService_finishesCleanupBeforeExit() throws IOException, InterruptedException {
+    EventualCleanupService cleanupService = new EventualCleanupService(4, 1000);
+    testDeleteManyFiles(cleanupService, 1000, true);
+    assertTrue(cleanupService.await(1, TimeUnit.SECONDS));
+  }
+
+  private void testDeleteManyFiles(CleanupService cleanupService, int n) throws IOException, InterruptedException {
+    testDeleteManyFiles(cleanupService, n, false);
+  }
+
+  private void testDeleteManyFiles(CleanupService cleanupService, int n,
+                                   boolean shutdownAfterQueueing) throws IOException, InterruptedException {
+    this.cleanupService = cleanupService;
+    Configuration conf = new Configuration();
+    cleanupService.start();
+
+    Collection<Path> files = createManyFiles(n);
+
+    for (Path p: files) {
+      cleanupService.deleteRecursive(p, p.getFileSystem(conf));
+    }
+
+    if (shutdownAfterQueueing) {
+      cleanupService.shutdown();
+    }
+
+    assertTrueEventually(() -> {
+      try {
+        for (Path p : files) {
+          FileSystem fs = p.getFileSystem(conf);
+          assertNotNull(fs);
+          assertFalse(p + " should not exist", fs.exists(p));
+        }
+      } catch (Exception e) {
+        throw new AssertionError(e);
+      }
+    });
+  }
+
+  private Path createFile(String name) throws IOException {
+    Path p = new Path(TEMP_DIR, name);
+    FileSystem fs = p.getFileSystem(new Configuration());
+    fs.create(p);
+    return p;
+  }
+
+  private Collection<Path> createManyFiles(int n) throws IOException {
+    Collection<Path> files = new ArrayList<>();
+    for (int i = 0; i < n; i++) {
+      files.add(createFile("many_" + i));
+    }
+    return files;
+  }
+
+  private void assertTrueEventually(AssertTask assertTask) throws InterruptedException {
+    assertTrueEventually(assertTask, 100000);
+  }
+
+  private void assertTrueEventually(AssertTask assertTask, int timeoutMillis) throws InterruptedException {
+    long endTime = System.currentTimeMillis() + timeoutMillis;
+    AssertionError assertionError = null;
+
+    while (System.currentTimeMillis() < endTime) {
+      try {
+        assertTask.call();
+        return;
+      } catch (AssertionError e) {
+        assertionError = e;
+        sleep(50);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    throw assertionError;
+  }
+
+  private static interface AssertTask {
+    void call() throws AssertionError;
+  }
+}
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index b415a76..82d039b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.processors.SetProcessor;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.common.util.HiveVersionInfo;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.cli.FetchOrientation;
@@ -157,7 +156,11 @@ public class HiveSessionImpl implements HiveSession {
    * That's why it is important to create SessionState here rather than in the constructor.
    */
   public void open(Map<String, String> sessionConfMap) throws HiveSQLException {
-    sessionState = new SessionState(sessionConf, username);
+    if (sessionManager != null) {
+      sessionState = new SessionState(sessionConf, username, sessionManager.getCleanupService());
+    } else {
+      sessionState = new SessionState(sessionConf, username);
+    }
     sessionState.setUserIpAddress(ipAddress);
     sessionState.setIsHiveServerQuery(true);
     sessionState.setForwardedAddresses(SessionManager.getForwardedAddresses());
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index ae2c17b..11f8efd 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -45,7 +45,10 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.cleanup.SyncCleanupService;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.cleanup.CleanupService;
+import org.apache.hadoop.hive.ql.cleanup.EventualCleanupService;
 import org.apache.hadoop.hive.ql.hooks.HookUtils;
 import org.apache.hive.service.CompositeService;
 import org.apache.hive.service.cli.HiveSQLException;
@@ -101,6 +104,7 @@ public class SessionManager extends CompositeService {
   private final HiveServer2 hiveServer2;
   private String sessionImplWithUGIclassName;
   private String sessionImplclassName;
+  private CleanupService cleanupService;
 
   public SessionManager(HiveServer2 hiveServer2, boolean allowSessions) {
     super(SessionManager.class.getSimpleName());
@@ -135,6 +139,15 @@ public class SessionManager extends CompositeService {
     userIpAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS);
     LOG.info("Connections limit are user: {} ipaddress: {} user-ipaddress: {}", userLimit, ipAddressLimit,
       userIpAddressLimit);
+
+    int cleanupThreadCount = hiveConf.getIntVar(ConfVars.HIVE_ASYNC_CLEANUP_SERVICE_THREAD_COUNT);
+    int cleanupQueueSize = hiveConf.getIntVar(ConfVars.HIVE_ASYNC_CLEANUP_SERVICE_QUEUE_SIZE);
+    if (cleanupThreadCount > 0) {
+      cleanupService = new EventualCleanupService(cleanupThreadCount, cleanupQueueSize);
+    } else {
+      cleanupService = SyncCleanupService.INSTANCE;
+    }
+    cleanupService.start();
     super.init(hiveConf);
   }
 
@@ -279,6 +292,10 @@ public class SessionManager extends CompositeService {
     }
   }
 
+  public CleanupService getCleanupService() {
+    return cleanupService;
+  }
+
   private final Object timeoutCheckerLock = new Object();
 
   private void startTimeoutChecker() {
@@ -343,6 +360,7 @@ public class SessionManager extends CompositeService {
     shutdownTimeoutChecker();
     if (backgroundOperationPool != null) {
       backgroundOperationPool.shutdown();
+      cleanupService.shutdown();
       long timeout = hiveConf.getTimeVar(
           ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
       try {