You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:11 UTC
[61/75] [abbrv] hive git commit: HIVE-20737: Local SparkContext is
shared between user sessions and should be closed only when there is no
active (Denys Kuzmenko reviewed by Antal Sinkovits,
Sahil Takiar and Peter Vary)
HIVE-20737: Local SparkContext is shared between user sessions and should be closed only when there is no active (Denys Kuzmenko reviewed by Antal Sinkovits, Sahil Takiar and Peter Vary)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/36015eab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/36015eab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/36015eab
Branch: refs/heads/master-tez092
Commit: 36015eab482b5fc5c55ac02f7fa3085d6d877b09
Parents: 1a1d6ca
Author: denys kuzmenko <dk...@cloudera.com>
Authored: Wed Oct 24 10:26:52 2018 +0200
Committer: Peter Vary <pv...@cloudera.com>
Committed: Wed Oct 24 10:26:52 2018 +0200
----------------------------------------------------------------------
.../ql/exec/spark/LocalHiveSparkClient.java | 38 ++---
.../ql/exec/spark/session/SparkSessionImpl.java | 73 ++++-----
.../ql/exec/spark/TestLocalHiveSparkClient.java | 149 +++++++++++++++++++
3 files changed, 200 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/36015eab/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index 72ff53e..6c6122a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -27,7 +27,6 @@ import java.util.Map;
import org.apache.hadoop.hive.ql.exec.DagUtils;
import org.apache.hive.spark.client.SparkClientUtilities;
-import org.apache.spark.util.CallSite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -62,32 +61,34 @@ import com.google.common.base.Strings;
* environment and execute spark work.
*/
public class LocalHiveSparkClient implements HiveSparkClient {
+
private static final long serialVersionUID = 1L;
- private static final String MR_JAR_PROPERTY = "tmpjars";
- protected static final transient Logger LOG = LoggerFactory
- .getLogger(LocalHiveSparkClient.class);
+ private static final transient Logger LOG = LoggerFactory
+ .getLogger(LocalHiveSparkClient.class);
+ private static final String MR_JAR_PROPERTY = "tmpjars";
private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
private static LocalHiveSparkClient client;
-
- public static synchronized LocalHiveSparkClient getInstance(
- SparkConf sparkConf, HiveConf hiveConf) throws FileNotFoundException, MalformedURLException {
- if (client == null) {
- client = new LocalHiveSparkClient(sparkConf, hiveConf);
- }
- return client;
- }
+ private int activeSessions = 0;
private final JavaSparkContext sc;
private final List<String> localJars = new ArrayList<String>();
-
private final List<String> localFiles = new ArrayList<String>();
private final JobMetricsListener jobMetricsListener;
+ public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf, HiveConf hiveConf)
+ throws FileNotFoundException, MalformedURLException {
+ if (client == null) {
+ client = new LocalHiveSparkClient(sparkConf, hiveConf);
+ }
+ ++client.activeSessions;
+ return client;
+ }
+
private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf)
throws FileNotFoundException, MalformedURLException {
String regJar = null;
@@ -239,10 +240,13 @@ public class LocalHiveSparkClient implements HiveSparkClient {
@Override
public void close() {
synchronized (LocalHiveSparkClient.class) {
- client = null;
- }
- if (sc != null) {
- sc.stop();
+ if (--activeSessions == 0) {
+ client = null;
+ if (sc != null) {
+ LOG.debug("Shutting down the SparkContext");
+ sc.stop();
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/36015eab/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index bb50129..1d251ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -109,11 +109,6 @@ public class SparkSessionImpl implements SparkSession {
*/
private final Set<String> activeJobs = Sets.newConcurrentHashSet();
- /**
- * True if at least a single query has been run by this session, false otherwise.
- */
- private volatile boolean queryCompleted;
-
private ReadWriteLock closeLock = new ReentrantReadWriteLock();
SparkSessionImpl(String sessionId) {
@@ -123,27 +118,26 @@ public class SparkSessionImpl implements SparkSession {
@Override
public void open(HiveConf conf) throws HiveException {
- closeLock.readLock().lock();
+ closeLock.writeLock().lock();
+
try {
- LOG.info("Trying to open Hive on Spark session {}", sessionId);
- this.conf = conf;
- isOpen = true;
- try {
- hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId,
- SessionState.get().getSessionId());
- } catch (Throwable e) {
- // It's possible that user session is closed while creating Spark client.
- HiveException he;
- if (isOpen) {
- he = getHiveException(e);
- } else {
- he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId);
+ if (!isOpen) {
+ LOG.info("Trying to open Hive on Spark session {}", sessionId);
+ this.conf = conf;
+
+ try {
+ hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId,
+ SessionState.get().getSessionId());
+ isOpen = true;
+ } catch (Throwable e) {
+ throw getHiveException(e);
}
- throw he;
+ LOG.info("Hive on Spark session {} successfully opened", sessionId);
+ } else {
+ LOG.info("Hive on Spark session {} is already opened", sessionId);
}
- LOG.info("Hive on Spark session {} successfully opened", sessionId);
} finally {
- closeLock.readLock().unlock();
+ closeLock.writeLock().unlock();
}
}
@@ -198,12 +192,7 @@ public class SparkSessionImpl implements SparkSession {
@Override
public boolean isOpen() {
- closeLock.readLock().lock();
- try {
- return isOpen;
- } finally {
- closeLock.readLock().unlock();
- }
+ return isOpen;
}
@Override
@@ -220,10 +209,10 @@ public class SparkSessionImpl implements SparkSession {
public void close() {
if (isOpen) {
closeLock.writeLock().lock();
+
try {
if (isOpen) {
LOG.info("Trying to close Hive on Spark session {}", sessionId);
- isOpen = false;
if (hiveSparkClient != null) {
try {
hiveSparkClient.close();
@@ -234,8 +223,9 @@ public class SparkSessionImpl implements SparkSession {
}
}
hiveSparkClient = null;
- queryCompleted = false;
lastSparkJobCompletionTime = 0;
+
+ isOpen = false;
}
} finally {
closeLock.writeLock().unlock();
@@ -348,10 +338,11 @@ public class SparkSessionImpl implements SparkSession {
*/
@Override
public boolean triggerTimeout(long sessionTimeout) {
- if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
+ if (hasTimedOut(activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
closeLock.writeLock().lock();
+
try {
- if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
+ if (hasTimedOut(activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " +
"been run in the past " + sessionTimeout / 1000 + " seconds");
close();
@@ -366,28 +357,24 @@ public class SparkSessionImpl implements SparkSession {
/**
* Returns true if a session has timed out, false otherwise. The following conditions must be met
- * in order to consider a session as timed out: (1) the session must have run at least one
- * query, (2) there can be no actively running Spark jobs, and (3) the last completed Spark job
- * must have been more than sessionTimeout seconds ago.
+ * in order to consider a session as timed out:
+ * (1) the session must have run at least one query (i.e. lastSparkJobCompletionTime > 0),
+ * (2) there can be no actively running Spark jobs, and
+ * (3) the last completed Spark job must have been more than sessionTimeout seconds ago.
*/
- private static boolean hasTimedOut(boolean queryCompleted, Set<String> activeJobs,
+ private static boolean hasTimedOut(Set<String> activeJobs,
long lastSparkJobCompletionTime, long sessionTimeout) {
- return queryCompleted &&
- activeJobs.isEmpty() &&
+ return activeJobs.isEmpty() &&
lastSparkJobCompletionTime > 0 &&
(System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout;
}
/**
- * When this session completes the execution of a query, set the {@link #queryCompleted} flag
- * to true if it hasn't already been set, remove the query from the list of actively running jobs,
+ * When this session completes the execution of a query, remove the query from the list of actively running jobs,
* and set the {@link #lastSparkJobCompletionTime} to the current timestamp.
*/
@Override
public void onQueryCompletion(String queryId) {
- if (!queryCompleted) {
- queryCompleted = true;
- }
activeJobs.remove(queryId);
lastSparkJobCompletionTime = System.currentTimeMillis();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/36015eab/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java
new file mode 100644
index 0000000..bbf3d9c
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionImpl;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.net.MalformedURLException;
+import java.nio.file.Paths;
+import java.util.List;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.fail;
+
+/**
+ * With local spark context, all user sessions share the same spark context.
+ */
+public class TestLocalHiveSparkClient {
+
+ private final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ @Test
+ public void testMultiSessionSparkContextReUse() throws MalformedURLException {
+ String confDir = "../data/conf/spark/local/hive-site.xml";
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
+
+ ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
+
+ List<CompletableFuture<Void>> futures =
+ IntStream.range(0, barrier.getParties()).boxed()
+ .map(i -> CompletableFuture.supplyAsync(() -> execute(i), executor))
+ .collect(Collectors.toList());
+
+ futures.forEach(CompletableFuture::join);
+ }
+
+ private Void execute(Integer threadId) {
+ HiveConf conf = new HiveConf();
+
+ conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
+ conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+ "TestLocalHiveSparkClient-testMultiSessionSparkContextReuse-local-dir").toString());
+
+ SessionState.start(conf);
+ try{
+ runSparkTestSession(conf, threadId);
+ } catch (Exception ex){
+ fail(ex.getMessage());
+ }
+ return null;
+ }
+
+ private void runSparkTestSession(HiveConf conf, int threadId) throws Exception {
+ conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "10s");
+ conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s");
+
+ Driver driver = null;
+ try {
+ driver = new Driver(new QueryState.Builder()
+ .withGenerateNewQueryId(true)
+ .withHiveConf(conf).build(), null, null);
+
+ SparkSession sparkSession = SparkUtilities.getSparkSession(conf,
+ SparkSessionManagerImpl.getInstance());
+
+ Assert.assertEquals(0, driver.run("show tables").getResponseCode());
+ barrier.await();
+
+ SparkContext sparkContext = getSparkContext(sparkSession);
+ Assert.assertFalse(sparkContext.isStopped());
+
+ if(threadId == 1) {
+ barrier.await();
+ closeSparkSession(sparkSession);
+ Assert.assertTrue(sparkContext.isStopped());
+
+ } else {
+ closeSparkSession(sparkSession);
+ Assert.assertFalse(sparkContext.isStopped());
+ barrier.await();
+ }
+ } finally {
+ if (driver != null) {
+ driver.destroy();
+ }
+ }
+ }
+
+ private void closeSparkSession(SparkSession session) throws ReflectiveOperationException {
+ Assert.assertTrue(session.isOpen());
+ session.close();
+
+ Assert.assertFalse(session.isOpen());
+ }
+
+ private SparkContext getSparkContext(SparkSession sparkSession) throws ReflectiveOperationException {
+ HiveSparkClient sparkClient = getSparkClient(sparkSession);
+ Assert.assertNotNull(sparkClient);
+
+ return getSparkContext(sparkClient).sc();
+ }
+
+ private JavaSparkContext getSparkContext(HiveSparkClient sparkClient) throws ReflectiveOperationException {
+ Field sparkContextField = LocalHiveSparkClient.class.getDeclaredField("sc");
+ sparkContextField.setAccessible(true);
+
+ return (JavaSparkContext) sparkContextField.get(sparkClient);
+ }
+
+ private HiveSparkClient getSparkClient(SparkSession sparkSession) throws ReflectiveOperationException {
+ Field clientField = SparkSessionImpl.class.getDeclaredField("hiveSparkClient");
+ clientField.setAccessible(true);
+
+ return (HiveSparkClient) clientField.get(sparkSession);
+ }
+}