You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/08/31 20:43:38 UTC
hive git commit: HIVE-14162: Allow disabling of long running job on
Hive On Spark On YARN (Sahil Takiar, reviewed by Adam Szita)
Repository: hive
Updated Branches:
refs/heads/master 219538701 -> 189d3fec2
HIVE-14162: Allow disabling of long running job on Hive On Spark On YARN (Sahil Takiar, reviewed by Adam Szita)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/189d3fec
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/189d3fec
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/189d3fec
Branch: refs/heads/master
Commit: 189d3fec25dfb94b209b1a34c1be674ce9d85bc5
Parents: 2195387
Author: Sahil Takiar <ta...@gmail.com>
Authored: Mon Jul 16 10:26:21 2018 -0500
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Fri Aug 31 15:42:57 2018 -0500
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +
.../ql/exec/spark/TestSparkSessionTimeout.java | 145 +++++++++++
.../java/org/apache/hadoop/hive/ql/Driver.java | 10 +
.../ql/exec/spark/session/SparkSession.java | 27 ++
.../ql/exec/spark/session/SparkSessionImpl.java | 246 ++++++++++++++-----
.../spark/session/SparkSessionManagerImpl.java | 63 +++--
6 files changed, 423 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8c39de3..40ea3ac 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4311,6 +4311,12 @@ public class HiveConf extends Configuration {
"specified (default) then the spark-submit shell script is used to launch the Spark " +
"app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " +
"InProcessLauncher is used to programmatically launch the app."),
+ SPARK_SESSION_TIMEOUT("hive.spark.session.timeout", "30m", new TimeValidator(TimeUnit.MINUTES,
+ 30L, true, null, true), "Amount of time the Spark Remote Driver should wait for " +
+ " a Spark job to be submitted before shutting down. Minimum value is 30 minutes"),
+ SPARK_SESSION_TIMEOUT_PERIOD("hive.spark.session.timeout.period", "60s",
+ new TimeValidator(TimeUnit.SECONDS, 60L, true, null, true),
+ "How frequently to check for idle Spark sessions. Minimum value is 60 seconds."),
NWAYJOINREORDER("hive.reorder.nway.joins", true,
"Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true,
http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java
new file mode 100644
index 0000000..c887297
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class TestSparkSessionTimeout {
+
+ @Test
+ public void testSparkSessionTimeout() throws HiveException, InterruptedException, MalformedURLException {
+ String confDir = "../../data/conf/spark/standalone/hive-site.xml";
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
+
+ HiveConf conf = new HiveConf();
+ conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+ "TestSparkSessionTimeout-testSparkSessionTimeout-local-dir").toString());
+
+ SessionState.start(conf);
+
+ runTestSparkSessionTimeout(conf);
+ }
+
+ @Test
+ public void testMultiSessionSparkSessionTimeout() throws InterruptedException,
+ ExecutionException {
+ List<Future<Void>> futures = new ArrayList<>();
+ ExecutorService es = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ futures.add(es.submit(() -> {
+ String confDir = "../../data/conf/spark/local/hive-site.xml";
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
+
+ HiveConf conf = new HiveConf();
+ conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
+ conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+ "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString());
+
+ SessionState.start(conf);
+
+ runTestSparkSessionTimeout(conf);
+ return null;
+ }));
+ }
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ }
+
+ @Test
+ public void testMultiSparkSessionTimeout() throws ExecutionException, InterruptedException {
+ List<Future<Void>> futures = new ArrayList<>();
+ ExecutorService es = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ futures.add(es.submit(() -> {
+ String confDir = "../../data/conf/spark/local/hive-site.xml";
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
+
+ HiveConf conf = new HiveConf();
+ conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
+ conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+ "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString());
+
+ SessionState.start(conf);
+
+ runTestSparkSessionTimeout(conf);
+ return null;
+ }));
+ }
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ }
+
+ private void runTestSparkSessionTimeout(HiveConf conf) throws HiveException,
+ InterruptedException {
+ conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "5s");
+ conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s");
+
+ String tableName = "test" + UUID.randomUUID().toString().replace("-", "");
+
+ Driver driver = null;
+
+ try {
+ driver = new Driver(new QueryState.Builder()
+ .withGenerateNewQueryId(true)
+ .withHiveConf(conf).build(),
+ null, null);
+
+ SparkSession sparkSession = SparkUtilities.getSparkSession(conf, SparkSessionManagerImpl
+ .getInstance());
+
+ Assert.assertEquals(0,
+ driver.run("create table " + tableName + " (col int)").getResponseCode());
+ Assert.assertEquals(0,
+ driver.run("select * from " + tableName + " order by col").getResponseCode());
+
+ Thread.sleep(10000);
+
+ Assert.assertFalse(sparkSession.isOpen());
+
+ Assert.assertEquals(0,
+ driver.run("select * from " + tableName + " order by col").getResponseCode());
+ } finally {
+ if (driver != null) {
+ Assert.assertEquals(0, driver.run("drop table if exists " + tableName).getResponseCode());
+ driver.destroy();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 737debd..dad2035 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.TaskResult;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.Entity.Type;
@@ -544,6 +545,11 @@ public class Driver implements IDriver {
String queryId = queryState.getQueryId();
+ SparkSession ss = SessionState.get().getSparkSession();
+ if (ss != null) {
+ ss.onQuerySubmission(queryId);
+ }
+
if (ctx != null) {
setTriggerContext(queryId);
}
@@ -2570,6 +2576,10 @@ public class Driver implements IDriver {
queryState.setNumModifiedRows(numModifiedRows);
console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
}
+ SparkSession ss = SessionState.get().getSparkSession();
+ if (ss != null) {
+ ss.onQueryCompletion(queryId);
+ }
lDrvState.stateLock.lock();
try {
lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED;
http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
index f96a8f7..62f88c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
import java.io.IOException;
public interface SparkSession {
+
/**
* Initializes a Spark session for DAG execution.
* @param conf Hive configuration.
@@ -75,4 +76,30 @@ public interface SparkSession {
* Get an HDFS dir specific to the SparkSession
* */
Path getHDFSSessionDir() throws IOException;
+
+ /**
+ * Callback function that is invoked by the {@link org.apache.hadoop.hive.ql.Driver} when a
+ * query has completed.
+ *
+ * @param queryId the id of the query that completed
+ */
+ void onQueryCompletion(String queryId);
+
+ /**
+ * Callback function that is invoked by the {@link org.apache.hadoop.hive.ql.Driver} when a
+ * query has been submitted.
+ *
+ * @param queryId the id of the query that completed
+ */
+ void onQuerySubmission(String queryId);
+
+ /**
+ * Checks if a session has timed out, and closes if the session if the timeout has occurred;
+ * returns true if the session timed out, and false otherwise.
+ *
+ * @param sessionTimeout the session timeout
+ *
+ * @return true if the session timed out and was closed, false otherwise
+ */
+ boolean triggerTimeout(long sessionTimeout);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index 2015810..6a8b42e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hive.ql.exec.spark.session;
import java.io.IOException;
import java.util.Map;
-import java.util.UUID;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -28,13 +30,17 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.session.SessionState;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
@@ -43,12 +49,31 @@ import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+
import org.apache.spark.SparkConf;
import org.apache.spark.util.Utils;
import com.google.common.base.Preconditions;
+/**
+ * Implementation of {@link SparkSession} that treats each Spark session as a separate Spark
+ * application.
+ *
+ * <p>
+ * It uses a {@link HiveSparkClient} to submit a Spark application and to submit Spark jobs to
+ * the Spark app.
+ * </p>
+ *
+ * <p>
+ * This class contains logic to trigger a timeout of this {@link SparkSession} if certain
+ * conditions are met (e.g. a job hasn't been submitted in the past "x" seconds). Since we use
+ * a threadpool to schedule a task that regularly checks if a session has timed out, we need to
+ * properly synchronize the {@link #open(HiveConf)} and {@link #close()} methods. We use a
+ * series of volatile variables and read-write locks to ensure this.
+ * </p>
+ */
public class SparkSessionImpl implements SparkSession {
+
private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class);
private static final String SPARK_DIR = "_spark_session_dir";
@@ -65,13 +90,33 @@ public class SparkSessionImpl implements SparkSession {
/** Pre-compiled error patterns. Shared between all Spark sessions */
private static Map<String, Pattern> errorPatterns;
- private HiveConf conf;
- private boolean isOpen;
+ // Several of the following variables need to be volatile so they can be accessed by the timeout
+ // thread
+
+ private volatile HiveConf conf;
+ private volatile boolean isOpen;
private final String sessionId;
- private HiveSparkClient hiveSparkClient;
- private Path scratchDir;
+ private volatile HiveSparkClient hiveSparkClient;
+ private volatile Path scratchDir;
private final Object dirLock = new Object();
+ /**
+ * The timestamp of the last completed Spark job.
+ */
+ private volatile long lastSparkJobCompletionTime;
+
+ /**
+ * A {@link Set} of currently running queries. Each job is identified by its query id.
+ */
+ private final Set<String> activeJobs = Sets.newConcurrentHashSet();
+
+ /**
+ * True if at least a single query has been run by this session, false otherwise.
+ */
+ private volatile boolean queryCompleted;
+
+ private ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
SparkSessionImpl(String sessionId) {
this.sessionId = sessionId;
initErrorPatterns();
@@ -79,67 +124,87 @@ public class SparkSessionImpl implements SparkSession {
@Override
public void open(HiveConf conf) throws HiveException {
- LOG.info("Trying to open Hive on Spark session {}", sessionId);
- this.conf = conf;
- isOpen = true;
+ closeLock.readLock().lock();
try {
- hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId,
+ LOG.info("Trying to open Hive on Spark session {}", sessionId);
+ this.conf = conf;
+ isOpen = true;
+ try {
+ hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId,
SessionState.get().getSessionId());
- } catch (Throwable e) {
- // It's possible that user session is closed while creating Spark client.
- HiveException he;
- if (isOpen) {
- he = getHiveException(e);
- } else {
- he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId);
+ } catch (Throwable e) {
+ // It's possible that user session is closed while creating Spark client.
+ HiveException he;
+ if (isOpen) {
+ he = getHiveException(e);
+ } else {
+ he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId);
+ }
+ throw he;
}
- throw he;
+ LOG.info("Hive on Spark session {} successfully opened", sessionId);
+ } finally {
+ closeLock.readLock().unlock();
}
- LOG.info("Hive on Spark session {} successfully opened", sessionId);
}
@Override
public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
- Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs.");
- return hiveSparkClient.execute(driverContext, sparkWork);
+ closeLock.readLock().lock();
+ try {
+ Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs.");
+ return hiveSparkClient.execute(driverContext, sparkWork);
+ } finally {
+ closeLock.readLock().unlock();
+ }
}
@Override
public ObjectPair<Long, Integer> getMemoryAndCores() throws Exception {
- SparkConf sparkConf = hiveSparkClient.getSparkConf();
- int numExecutors = hiveSparkClient.getExecutorCount();
- // at start-up, we may be unable to get number of executors
- if (numExecutors <= 0) {
- return new ObjectPair<Long, Integer>(-1L, -1);
- }
- int executorMemoryInMB = Utils.memoryStringToMb(
- sparkConf.get("spark.executor.memory", "512m"));
- double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6);
- long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024);
- int totalCores;
- String masterURL = sparkConf.get("spark.master");
- if (masterURL.startsWith("spark") || masterURL.startsWith("local")) {
- totalCores = sparkConf.contains("spark.default.parallelism") ?
- sparkConf.getInt("spark.default.parallelism", 1) :
- hiveSparkClient.getDefaultParallelism();
- totalCores = Math.max(totalCores, numExecutors);
- } else {
- int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
- totalCores = numExecutors * coresPerExecutor;
+ closeLock.readLock().lock();
+ try {
+ SparkConf sparkConf = hiveSparkClient.getSparkConf();
+ int numExecutors = hiveSparkClient.getExecutorCount();
+ // at start-up, we may be unable to get number of executors
+ if (numExecutors <= 0) {
+ return new ObjectPair<Long, Integer>(-1L, -1);
+ }
+ int executorMemoryInMB = Utils.memoryStringToMb(
+ sparkConf.get("spark.executor.memory", "512m"));
+ double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6);
+ long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024);
+ int totalCores;
+ String masterURL = sparkConf.get("spark.master");
+ if (masterURL.startsWith("spark") || masterURL.startsWith("local")) {
+ totalCores = sparkConf.contains("spark.default.parallelism") ?
+ sparkConf.getInt("spark.default.parallelism", 1) :
+ hiveSparkClient.getDefaultParallelism();
+ totalCores = Math.max(totalCores, numExecutors);
+ } else {
+ int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
+ totalCores = numExecutors * coresPerExecutor;
+ }
+ totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1);
+
+ long memoryPerTaskInBytes = totalMemory / totalCores;
+ LOG.info("Hive on Spark application currently has number of executors: " + numExecutors
+ + ", total cores: " + totalCores + ", memory per executor: "
+ + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction);
+ return new ObjectPair<Long, Integer>(Long.valueOf(memoryPerTaskInBytes),
+ Integer.valueOf(totalCores));
+ } finally {
+ closeLock.readLock().unlock();
}
- totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1);
-
- long memoryPerTaskInBytes = totalMemory / totalCores;
- LOG.info("Hive on Spark application currently has number of executors: " + numExecutors
- + ", total cores: " + totalCores + ", memory per executor: "
- + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction);
- return new ObjectPair<Long, Integer>(Long.valueOf(memoryPerTaskInBytes),
- Integer.valueOf(totalCores));
}
@Override
public boolean isOpen() {
- return isOpen;
+ closeLock.readLock().lock();
+ try {
+ return isOpen;
+ } finally {
+ closeLock.readLock().unlock();
+ }
}
@Override
@@ -154,18 +219,29 @@ public class SparkSessionImpl implements SparkSession {
@Override
public void close() {
- LOG.info("Trying to close Hive on Spark session {}", sessionId);
- isOpen = false;
- if (hiveSparkClient != null) {
+ if (isOpen) {
+ closeLock.writeLock().lock();
try {
- hiveSparkClient.close();
- LOG.info("Hive on Spark session {} successfully closed", sessionId);
- cleanScratchDir();
- } catch (IOException e) {
- LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e);
+ if (isOpen) {
+ LOG.info("Trying to close Hive on Spark session {}", sessionId);
+ isOpen = false;
+ if (hiveSparkClient != null) {
+ try {
+ hiveSparkClient.close();
+ LOG.info("Hive on Spark session {} successfully closed", sessionId);
+ cleanScratchDir();
+ } catch (IOException e) {
+ LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e);
+ }
+ }
+ hiveSparkClient = null;
+ queryCompleted = false;
+ lastSparkJobCompletionTime = 0;
+ }
+ } finally {
+ closeLock.writeLock().unlock();
}
}
- hiveSparkClient = null;
}
private Path createScratchDir() throws IOException {
@@ -261,6 +337,60 @@ public class SparkSessionImpl implements SparkSession {
return scratchDir;
}
+ @Override
+ public void onQuerySubmission(String queryId) {
+ activeJobs.add(queryId);
+ }
+
+ /**
+ * Check if a session has timed out, and if it has close the session.
+ */
+ @Override
+ public boolean triggerTimeout(long sessionTimeout) {
+ if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
+ closeLock.writeLock().lock();
+ try {
+ if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
+ LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " +
+ "been run in the past " + sessionTimeout / 1000 + " seconds");
+ close();
+ return true;
+ }
+ } finally {
+ closeLock.writeLock().unlock();
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns true if a session has timed out, false otherwise. The following conditions must be met
+ * in order to consider a session as timed out: (1) the session must have run at least one
+ * query, (2) there can be no actively running Spark jobs, and (3) the last completed Spark job
+ * must have been more than sessionTimeout seconds ago.
+ */
+ private static boolean hasTimedOut(boolean queryCompleted, Set<String> activeJobs,
+ long lastSparkJobCompletionTime, long sessionTimeout) {
+ return queryCompleted &&
+ activeJobs.isEmpty() &&
+ lastSparkJobCompletionTime > 0 &&
+ (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout;
+ }
+
+ /**
+ * When this session completes the execution of a query, set the {@link #queryCompleted} flag
+ * to true if it hasn't already been set, remove the query from the list of actively running jobs,
+ * and set the {@link #lastSparkJobCompletionTime} to the current timestamp.
+ */
+ @Override
+ public void onQueryCompletion(String queryId) {
+ if (!queryCompleted) {
+ queryCompleted = true;
+ }
+ activeJobs.remove(queryId);
+ lastSparkJobCompletionTime = System.currentTimeMillis();
+ }
+
@VisibleForTesting
HiveSparkClient getHiveSparkClient() {
return hiveSparkClient;
http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
index 68c9e04..79a56bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
@@ -18,19 +18,23 @@
package org.apache.hadoop.hive.ql.exec.spark.session;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.hive.spark.client.SparkClientFactory;
/**
@@ -42,8 +46,16 @@ import org.apache.hive.spark.client.SparkClientFactory;
public class SparkSessionManagerImpl implements SparkSessionManager {
private static final Logger LOG = LoggerFactory.getLogger(SparkSessionManagerImpl.class);
- private Set<SparkSession> createdSessions = Collections.synchronizedSet(new HashSet<SparkSession>());
+ private final Set<SparkSession> createdSessions = Sets.newConcurrentHashSet();
+
+ /**
+ * A {@link Future} that tracks the status of the scheduled time out thread launched via the
+ * {@link #startTimeoutThread()} method.
+ */
+ private volatile Future<?> timeoutFuture;
+
private volatile boolean inited = false;
+ private volatile HiveConf conf;
private static SparkSessionManagerImpl instance;
@@ -80,9 +92,11 @@ public class SparkSessionManagerImpl implements SparkSessionManager {
synchronized (this) {
if (!inited) {
LOG.info("Setting up the session manager.");
- Map<String, String> conf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null);
+ conf = hiveConf;
+ startTimeoutThread();
+ Map<String, String> sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null);
try {
- SparkClientFactory.initialize(conf);
+ SparkClientFactory.initialize(sparkConf);
inited = true;
} catch (IOException e) {
throw new HiveException("Error initializing SparkClientFactory", e);
@@ -136,7 +150,7 @@ public class SparkSessionManagerImpl implements SparkSessionManager {
}
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Closing session (%s).", sparkSession.getSessionId()));
+ LOG.debug(String.format("Closing Spark session (%s).", sparkSession.getSessionId()));
}
sparkSession.close();
createdSessions.remove(sparkSession);
@@ -145,15 +159,32 @@ public class SparkSessionManagerImpl implements SparkSessionManager {
@Override
public void shutdown() {
LOG.info("Closing the session manager.");
- synchronized (createdSessions) {
- Iterator<SparkSession> it = createdSessions.iterator();
- while (it.hasNext()) {
- SparkSession session = it.next();
- session.close();
- }
- createdSessions.clear();
+ if (timeoutFuture != null) {
+ timeoutFuture.cancel(false);
}
+ createdSessions.forEach(SparkSession::close);
+ createdSessions.clear();
inited = false;
SparkClientFactory.stop();
}
+
+ /**
+ * Starts a scheduled thread that periodically calls {@link SparkSession#triggerTimeout(long)}
+ * on each {@link SparkSession} managed by this class.
+ */
+ private void startTimeoutThread() {
+ long sessionTimeout = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD,
+ TimeUnit.MILLISECONDS);
+ ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor();
+
+ // Schedules a thread that does the following: iterates through all the active SparkSessions
+ // and calls #triggerTimeout(long) on each one. If #triggerTimeout(long) returns true, then
+ // the SparkSession is removed from the set of active sessions managed by this class.
+ timeoutFuture = es.scheduleAtFixedRate(() -> createdSessions.stream()
+ .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout))
+ .forEach(createdSessions::remove),
+ 0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS);
+ }
}