You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/08/31 20:43:38 UTC

hive git commit: HIVE-14162: Allow disabling of long running job on Hive On Spark On YARN (Sahil Takiar, reviewed by Adam Szita)

Repository: hive
Updated Branches:
  refs/heads/master 219538701 -> 189d3fec2


HIVE-14162: Allow disabling of long running job on Hive On Spark On YARN (Sahil Takiar, reviewed by Adam Szita)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/189d3fec
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/189d3fec
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/189d3fec

Branch: refs/heads/master
Commit: 189d3fec25dfb94b209b1a34c1be674ce9d85bc5
Parents: 2195387
Author: Sahil Takiar <ta...@gmail.com>
Authored: Mon Jul 16 10:26:21 2018 -0500
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Fri Aug 31 15:42:57 2018 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   6 +
 .../ql/exec/spark/TestSparkSessionTimeout.java  | 145 +++++++++++
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  10 +
 .../ql/exec/spark/session/SparkSession.java     |  27 ++
 .../ql/exec/spark/session/SparkSessionImpl.java | 246 ++++++++++++++-----
 .../spark/session/SparkSessionManagerImpl.java  |  63 +++--
 6 files changed, 423 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8c39de3..40ea3ac 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4311,6 +4311,12 @@ public class HiveConf extends Configuration {
         "specified (default) then the spark-submit shell script is used to launch the Spark " +
         "app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " +
         "InProcessLauncher is used to programmatically launch the app."),
+    SPARK_SESSION_TIMEOUT("hive.spark.session.timeout", "30m", new TimeValidator(TimeUnit.MINUTES,
+            30L, true, null, true), "Amount of time the Spark Remote Driver should wait for " +
+            " a Spark job to be submitted before shutting down. Minimum value is 30 minutes"),
+    SPARK_SESSION_TIMEOUT_PERIOD("hive.spark.session.timeout.period", "60s",
+            new TimeValidator(TimeUnit.SECONDS, 60L, true, null, true),
+            "How frequently to check for idle Spark sessions. Minimum value is 60 seconds."),
     NWAYJOINREORDER("hive.reorder.nway.joins", true,
       "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
     HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java
new file mode 100644
index 0000000..c887297
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class TestSparkSessionTimeout {
+
+  @Test
+  public void testSparkSessionTimeout() throws HiveException, InterruptedException, MalformedURLException {
+    String confDir = "../../data/conf/spark/standalone/hive-site.xml";
+    HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
+
+    HiveConf conf = new HiveConf();
+    conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+            "TestSparkSessionTimeout-testSparkSessionTimeout-local-dir").toString());
+
+    SessionState.start(conf);
+
+    runTestSparkSessionTimeout(conf);
+  }
+
+  @Test
+  public void testMultiSessionSparkSessionTimeout() throws InterruptedException,
+          ExecutionException {
+    List<Future<Void>> futures = new ArrayList<>();
+    ExecutorService es = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      futures.add(es.submit(() -> {
+        String confDir = "../../data/conf/spark/local/hive-site.xml";
+        HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
+
+        HiveConf conf = new HiveConf();
+        conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
+        conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+                "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString());
+
+        SessionState.start(conf);
+
+        runTestSparkSessionTimeout(conf);
+        return null;
+      }));
+    }
+    for (Future<Void> future : futures) {
+      future.get();
+    }
+  }
+
+  @Test
+  public void testMultiSparkSessionTimeout() throws ExecutionException, InterruptedException {
+    List<Future<Void>> futures = new ArrayList<>();
+    ExecutorService es = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      futures.add(es.submit(() -> {
+        String confDir = "../../data/conf/spark/local/hive-site.xml";
+        HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
+
+        HiveConf conf = new HiveConf();
+        conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
+        conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+                "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString());
+
+        SessionState.start(conf);
+
+        runTestSparkSessionTimeout(conf);
+        return null;
+      }));
+    }
+    for (Future<Void> future : futures) {
+      future.get();
+    }
+  }
+
+  private void runTestSparkSessionTimeout(HiveConf conf) throws HiveException,
+          InterruptedException {
+    conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "5s");
+    conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s");
+
+    String tableName = "test" + UUID.randomUUID().toString().replace("-", "");
+
+    Driver driver = null;
+
+    try {
+      driver = new Driver(new QueryState.Builder()
+              .withGenerateNewQueryId(true)
+              .withHiveConf(conf).build(),
+              null, null);
+
+      SparkSession sparkSession = SparkUtilities.getSparkSession(conf, SparkSessionManagerImpl
+              .getInstance());
+
+      Assert.assertEquals(0,
+              driver.run("create table " + tableName + " (col int)").getResponseCode());
+      Assert.assertEquals(0,
+              driver.run("select * from " + tableName + " order by col").getResponseCode());
+
+      Thread.sleep(10000);
+
+      Assert.assertFalse(sparkSession.isOpen());
+
+      Assert.assertEquals(0,
+              driver.run("select * from " + tableName + " order by col").getResponseCode());
+    } finally {
+      if (driver != null) {
+        Assert.assertEquals(0, driver.run("drop table if exists " + tableName).getResponseCode());
+        driver.destroy();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 737debd..dad2035 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.TaskResult;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.hooks.Entity.Type;
@@ -544,6 +545,11 @@ public class Driver implements IDriver {
 
     String queryId = queryState.getQueryId();
 
+    SparkSession ss = SessionState.get().getSparkSession();
+    if (ss != null) {
+      ss.onQuerySubmission(queryId);
+    }
+
     if (ctx != null) {
       setTriggerContext(queryId);
     }
@@ -2570,6 +2576,10 @@ public class Driver implements IDriver {
         queryState.setNumModifiedRows(numModifiedRows);
         console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
       }
+      SparkSession ss = SessionState.get().getSparkSession();
+      if (ss != null) {
+        ss.onQueryCompletion(queryId);
+      }
       lDrvState.stateLock.lock();
       try {
         lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED;

http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
index f96a8f7..62f88c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
 import java.io.IOException;
 
 public interface SparkSession {
+
   /**
    * Initializes a Spark session for DAG execution.
    * @param conf Hive configuration.
@@ -75,4 +76,30 @@ public interface SparkSession {
    * Get an HDFS dir specific to the SparkSession
    * */
   Path getHDFSSessionDir() throws IOException;
+
+  /**
+   * Callback function that is invoked by the {@link org.apache.hadoop.hive.ql.Driver} when a
+   * query has completed.
+   *
+   * @param queryId the id of the query that completed
+   */
+  void onQueryCompletion(String queryId);
+
+  /**
+   * Callback function that is invoked by the {@link org.apache.hadoop.hive.ql.Driver} when a
+   * query has been submitted.
+   *
+   * @param queryId the id of the query that completed
+   */
+  void onQuerySubmission(String queryId);
+
+  /**
+   * Checks if a session has timed out, and closes if the session if the timeout has occurred;
+   * returns true if the session timed out, and false otherwise.
+   *
+   * @param sessionTimeout the session timeout
+   *
+   * @return true if the session timed out and was closed, false otherwise
+   */
+  boolean triggerTimeout(long sessionTimeout);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index 2015810..6a8b42e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hive.ql.exec.spark.session;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.UUID;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -28,13 +30,17 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.session.SessionState;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -43,12 +49,31 @@ import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
+
 import org.apache.spark.SparkConf;
 import org.apache.spark.util.Utils;
 
 import com.google.common.base.Preconditions;
 
+/**
+ * Implementation of {@link SparkSession} that treats each Spark session as a separate Spark
+ * application.
+ *
+ * <p>
+ *   It uses a {@link HiveSparkClient} to submit a Spark application and to submit Spark jobs to
+ *   the Spark app.
+ * </p>
+ *
+ * <p>
+ *   This class contains logic to trigger a timeout of this {@link SparkSession} if certain
+ *   conditions are met (e.g. a job hasn't been submitted in the past "x" seconds). Since we use
+ *   a threadpool to schedule a task that regularly checks if a session has timed out, we need to
+ *   properly synchronize the {@link #open(HiveConf)} and {@link #close()} methods. We use a
+ *   series of volatile variables and read-write locks to ensure this.
+ * </p>
+ */
 public class SparkSessionImpl implements SparkSession {
+
   private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class);
   private static final String SPARK_DIR = "_spark_session_dir";
 
@@ -65,13 +90,33 @@ public class SparkSessionImpl implements SparkSession {
   /** Pre-compiled error patterns. Shared between all Spark sessions */
   private static Map<String, Pattern> errorPatterns;
 
-  private HiveConf conf;
-  private boolean isOpen;
+  // Several of the following variables need to be volatile so they can be accessed by the timeout
+  // thread
+
+  private volatile HiveConf conf;
+  private volatile boolean isOpen;
   private final String sessionId;
-  private HiveSparkClient hiveSparkClient;
-  private Path scratchDir;
+  private volatile HiveSparkClient hiveSparkClient;
+  private volatile Path scratchDir;
   private final Object dirLock = new Object();
 
+  /**
+   * The timestamp of the last completed Spark job.
+   */
+  private volatile long lastSparkJobCompletionTime;
+
+  /**
+   * A {@link Set} of currently running queries. Each job is identified by its query id.
+   */
+  private final Set<String> activeJobs = Sets.newConcurrentHashSet();
+
+  /**
+   * True if at least a single query has been run by this session, false otherwise.
+   */
+  private volatile boolean queryCompleted;
+
+  private ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
   SparkSessionImpl(String sessionId) {
     this.sessionId = sessionId;
     initErrorPatterns();
@@ -79,67 +124,87 @@ public class SparkSessionImpl implements SparkSession {
 
   @Override
   public void open(HiveConf conf) throws HiveException {
-    LOG.info("Trying to open Hive on Spark session {}", sessionId);
-    this.conf = conf;
-    isOpen = true;
+    closeLock.readLock().lock();
     try {
-      hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId,
+      LOG.info("Trying to open Hive on Spark session {}", sessionId);
+      this.conf = conf;
+      isOpen = true;
+      try {
+        hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId,
               SessionState.get().getSessionId());
-    } catch (Throwable e) {
-      // It's possible that user session is closed while creating Spark client.
-      HiveException he;
-      if (isOpen) {
-        he = getHiveException(e);
-      } else {
-        he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId);
+      } catch (Throwable e) {
+        // It's possible that user session is closed while creating Spark client.
+        HiveException he;
+        if (isOpen) {
+          he = getHiveException(e);
+        } else {
+          he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId);
+        }
+        throw he;
       }
-      throw he;
+      LOG.info("Hive on Spark session {} successfully opened", sessionId);
+    } finally {
+      closeLock.readLock().unlock();
     }
-    LOG.info("Hive on Spark session {} successfully opened", sessionId);
   }
 
   @Override
   public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
-    Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs.");
-    return hiveSparkClient.execute(driverContext, sparkWork);
+    closeLock.readLock().lock();
+    try {
+      Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs.");
+      return hiveSparkClient.execute(driverContext, sparkWork);
+    } finally {
+      closeLock.readLock().unlock();
+    }
   }
 
   @Override
   public ObjectPair<Long, Integer> getMemoryAndCores() throws Exception {
-    SparkConf sparkConf = hiveSparkClient.getSparkConf();
-    int numExecutors = hiveSparkClient.getExecutorCount();
-    // at start-up, we may be unable to get number of executors
-    if (numExecutors <= 0) {
-      return new ObjectPair<Long, Integer>(-1L, -1);
-    }
-    int executorMemoryInMB = Utils.memoryStringToMb(
-        sparkConf.get("spark.executor.memory", "512m"));
-    double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6);
-    long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024);
-    int totalCores;
-    String masterURL = sparkConf.get("spark.master");
-    if (masterURL.startsWith("spark") || masterURL.startsWith("local")) {
-      totalCores = sparkConf.contains("spark.default.parallelism") ?
-          sparkConf.getInt("spark.default.parallelism", 1) :
-          hiveSparkClient.getDefaultParallelism();
-      totalCores = Math.max(totalCores, numExecutors);
-    } else {
-      int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
-      totalCores = numExecutors * coresPerExecutor;
+    closeLock.readLock().lock();
+    try {
+      SparkConf sparkConf = hiveSparkClient.getSparkConf();
+      int numExecutors = hiveSparkClient.getExecutorCount();
+      // at start-up, we may be unable to get number of executors
+      if (numExecutors <= 0) {
+        return new ObjectPair<Long, Integer>(-1L, -1);
+      }
+      int executorMemoryInMB = Utils.memoryStringToMb(
+              sparkConf.get("spark.executor.memory", "512m"));
+      double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6);
+      long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024);
+      int totalCores;
+      String masterURL = sparkConf.get("spark.master");
+      if (masterURL.startsWith("spark") || masterURL.startsWith("local")) {
+        totalCores = sparkConf.contains("spark.default.parallelism") ?
+                sparkConf.getInt("spark.default.parallelism", 1) :
+                hiveSparkClient.getDefaultParallelism();
+        totalCores = Math.max(totalCores, numExecutors);
+      } else {
+        int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
+        totalCores = numExecutors * coresPerExecutor;
+      }
+      totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1);
+
+      long memoryPerTaskInBytes = totalMemory / totalCores;
+      LOG.info("Hive on Spark application currently has number of executors: " + numExecutors
+              + ", total cores: " + totalCores + ", memory per executor: "
+              + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction);
+      return new ObjectPair<Long, Integer>(Long.valueOf(memoryPerTaskInBytes),
+              Integer.valueOf(totalCores));
+    } finally {
+      closeLock.readLock().unlock();
     }
-    totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1);
-
-    long memoryPerTaskInBytes = totalMemory / totalCores;
-    LOG.info("Hive on Spark application currently has number of executors: " + numExecutors
-        + ", total cores: " + totalCores + ", memory per executor: "
-        + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction);
-    return new ObjectPair<Long, Integer>(Long.valueOf(memoryPerTaskInBytes),
-        Integer.valueOf(totalCores));
   }
 
   @Override
   public boolean isOpen() {
-    return isOpen;
+    closeLock.readLock().lock();
+    try {
+      return isOpen;
+    } finally {
+      closeLock.readLock().unlock();
+    }
   }
 
   @Override
@@ -154,18 +219,29 @@ public class SparkSessionImpl implements SparkSession {
 
   @Override
   public void close() {
-    LOG.info("Trying to close Hive on Spark session {}", sessionId);
-    isOpen = false;
-    if (hiveSparkClient != null) {
+    if (isOpen) {
+      closeLock.writeLock().lock();
       try {
-        hiveSparkClient.close();
-        LOG.info("Hive on Spark session {} successfully closed", sessionId);
-        cleanScratchDir();
-      } catch (IOException e) {
-        LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e);
+        if (isOpen) {
+          LOG.info("Trying to close Hive on Spark session {}", sessionId);
+          isOpen = false;
+          if (hiveSparkClient != null) {
+            try {
+              hiveSparkClient.close();
+              LOG.info("Hive on Spark session {} successfully closed", sessionId);
+              cleanScratchDir();
+            } catch (IOException e) {
+              LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e);
+            }
+          }
+          hiveSparkClient = null;
+          queryCompleted = false;
+          lastSparkJobCompletionTime = 0;
+        }
+      } finally {
+        closeLock.writeLock().unlock();
       }
     }
-    hiveSparkClient = null;
   }
 
   private Path createScratchDir() throws IOException {
@@ -261,6 +337,60 @@ public class SparkSessionImpl implements SparkSession {
     return scratchDir;
   }
 
+  @Override
+  public void onQuerySubmission(String queryId) {
+    activeJobs.add(queryId);
+  }
+
+  /**
+   * Check if a session has timed out, and if it has close the session.
+   */
+  @Override
+  public boolean triggerTimeout(long sessionTimeout) {
+    if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
+      closeLock.writeLock().lock();
+      try {
+        if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
+          LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " +
+                  "been run in the past " + sessionTimeout / 1000 + " seconds");
+          close();
+          return true;
+        }
+      } finally {
+        closeLock.writeLock().unlock();
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Returns true if a session has timed out, false otherwise. The following conditions must be met
+   * in order to consider a session as timed out: (1) the session must have run at least one
+   * query, (2) there can be no actively running Spark jobs, and (3) the last completed Spark job
+   * must have been more than sessionTimeout seconds ago.
+   */
+  private static boolean hasTimedOut(boolean queryCompleted, Set<String> activeJobs,
+                                     long lastSparkJobCompletionTime, long sessionTimeout) {
+    return queryCompleted &&
+            activeJobs.isEmpty() &&
+            lastSparkJobCompletionTime > 0 &&
+            (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout;
+  }
+
+  /**
+   * When this session completes the execution of a query, set the {@link #queryCompleted} flag
+   * to true if it hasn't already been set, remove the query from the list of actively running jobs,
+   * and set the {@link #lastSparkJobCompletionTime} to the current timestamp.
+   */
+  @Override
+  public void onQueryCompletion(String queryId) {
+    if (!queryCompleted) {
+      queryCompleted = true;
+    }
+    activeJobs.remove(queryId);
+    lastSparkJobCompletionTime = System.currentTimeMillis();
+  }
+
   @VisibleForTesting
   HiveSparkClient getHiveSparkClient() {
     return hiveSparkClient;

http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
index 68c9e04..79a56bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
@@ -18,19 +18,23 @@
 package org.apache.hadoop.hive.ql.exec.spark.session;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
 
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.hive.spark.client.SparkClientFactory;
 
 /**
@@ -42,8 +46,16 @@ import org.apache.hive.spark.client.SparkClientFactory;
 public class SparkSessionManagerImpl implements SparkSessionManager {
   private static final Logger LOG = LoggerFactory.getLogger(SparkSessionManagerImpl.class);
 
-  private Set<SparkSession> createdSessions = Collections.synchronizedSet(new HashSet<SparkSession>());
+  private final Set<SparkSession> createdSessions = Sets.newConcurrentHashSet();
+
+  /**
+   * A {@link Future} that tracks the status of the scheduled time out thread launched via the
+   * {@link #startTimeoutThread()} method.
+   */
+  private volatile Future<?> timeoutFuture;
+
   private volatile boolean inited = false;
+  private volatile HiveConf conf;
 
   private static SparkSessionManagerImpl instance;
 
@@ -80,9 +92,11 @@ public class SparkSessionManagerImpl implements SparkSessionManager {
       synchronized (this) {
         if (!inited) {
           LOG.info("Setting up the session manager.");
-          Map<String, String> conf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null);
+          conf = hiveConf;
+          startTimeoutThread();
+          Map<String, String> sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null);
           try {
-            SparkClientFactory.initialize(conf);
+            SparkClientFactory.initialize(sparkConf);
             inited = true;
           } catch (IOException e) {
             throw new HiveException("Error initializing SparkClientFactory", e);
@@ -136,7 +150,7 @@ public class SparkSessionManagerImpl implements SparkSessionManager {
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("Closing session (%s).", sparkSession.getSessionId()));
+      LOG.debug(String.format("Closing Spark session (%s).", sparkSession.getSessionId()));
     }
     sparkSession.close();
     createdSessions.remove(sparkSession);
@@ -145,15 +159,32 @@ public class SparkSessionManagerImpl implements SparkSessionManager {
   @Override
   public void shutdown() {
     LOG.info("Closing the session manager.");
-    synchronized (createdSessions) {
-      Iterator<SparkSession> it = createdSessions.iterator();
-      while (it.hasNext()) {
-        SparkSession session = it.next();
-        session.close();
-      }
-      createdSessions.clear();
+    if (timeoutFuture != null) {
+      timeoutFuture.cancel(false);
     }
+    createdSessions.forEach(SparkSession::close);
+    createdSessions.clear();
     inited = false;
     SparkClientFactory.stop();
   }
+
+  /**
+   * Starts a scheduled thread that periodically calls {@link SparkSession#triggerTimeout(long)}
+   * on each {@link SparkSession} managed by this class.
+   */
+  private void startTimeoutThread() {
+    long sessionTimeout = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT,
+            TimeUnit.MILLISECONDS);
+    long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD,
+            TimeUnit.MILLISECONDS);
+    ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor();
+
+    // Schedules a thread that does the following: iterates through all the active SparkSessions
+    // and calls #triggerTimeout(long) on each one. If #triggerTimeout(long) returns true, then
+    // the SparkSession is removed from the set of active sessions managed by this class.
+    timeoutFuture = es.scheduleAtFixedRate(() -> createdSessions.stream()
+                    .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout))
+                    .forEach(createdSessions::remove),
+            0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS);
+  }
 }