You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:11 UTC

[61/75] [abbrv] hive git commit: HIVE-20737: Local SparkContext is shared between user sessions and should be closed only when there is no active (Denys Kuzmenko reviewed by Antal Sinkovits, Sahil Takiar and Peter Vary)

HIVE-20737: Local SparkContext is shared between user sessions and should be closed only when there is no active (Denys Kuzmenko reviewed by Antal Sinkovits, Sahil Takiar and Peter Vary)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/36015eab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/36015eab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/36015eab

Branch: refs/heads/master-tez092
Commit: 36015eab482b5fc5c55ac02f7fa3085d6d877b09
Parents: 1a1d6ca
Author: denys kuzmenko <dk...@cloudera.com>
Authored: Wed Oct 24 10:26:52 2018 +0200
Committer: Peter Vary <pv...@cloudera.com>
Committed: Wed Oct 24 10:26:52 2018 +0200

----------------------------------------------------------------------
 .../ql/exec/spark/LocalHiveSparkClient.java     |  38 ++---
 .../ql/exec/spark/session/SparkSessionImpl.java |  73 ++++-----
 .../ql/exec/spark/TestLocalHiveSparkClient.java | 149 +++++++++++++++++++
 3 files changed, 200 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/36015eab/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index 72ff53e..6c6122a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -27,7 +27,6 @@ import java.util.Map;
 
 import org.apache.hadoop.hive.ql.exec.DagUtils;
 import org.apache.hive.spark.client.SparkClientUtilities;
-import org.apache.spark.util.CallSite;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -62,32 +61,34 @@ import com.google.common.base.Strings;
  * environment and execute spark work.
  */
 public class LocalHiveSparkClient implements HiveSparkClient {
+
   private static final long serialVersionUID = 1L;
 
-  private static final String MR_JAR_PROPERTY = "tmpjars";
-  protected static final transient Logger LOG = LoggerFactory
-      .getLogger(LocalHiveSparkClient.class);
+  private static final transient Logger LOG = LoggerFactory
+          .getLogger(LocalHiveSparkClient.class);
 
+  private static final String MR_JAR_PROPERTY = "tmpjars";
   private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
 
   private static LocalHiveSparkClient client;
-
-  public static synchronized LocalHiveSparkClient getInstance(
-      SparkConf sparkConf, HiveConf hiveConf) throws FileNotFoundException, MalformedURLException {
-    if (client == null) {
-      client = new LocalHiveSparkClient(sparkConf, hiveConf);
-    }
-    return client;
-  }
+  private int activeSessions = 0;
 
   private final JavaSparkContext sc;
 
   private final List<String> localJars = new ArrayList<String>();
-
   private final List<String> localFiles = new ArrayList<String>();
 
   private final JobMetricsListener jobMetricsListener;
 
+  public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf, HiveConf hiveConf)
+      throws FileNotFoundException, MalformedURLException {
+    if (client == null) {
+      client = new LocalHiveSparkClient(sparkConf, hiveConf);
+    }
+    ++client.activeSessions;
+    return client;
+  }
+
   private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf)
       throws FileNotFoundException, MalformedURLException {
     String regJar = null;
@@ -239,10 +240,13 @@ public class LocalHiveSparkClient implements HiveSparkClient {
   @Override
   public void close() {
     synchronized (LocalHiveSparkClient.class) {
-      client = null;
-    }
-    if (sc != null) {
-      sc.stop();
+      if (--activeSessions == 0) {
+        client = null;
+        if (sc != null) {
+          LOG.debug("Shutting down the SparkContext");
+          sc.stop();
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/36015eab/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index bb50129..1d251ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -109,11 +109,6 @@ public class SparkSessionImpl implements SparkSession {
    */
   private final Set<String> activeJobs = Sets.newConcurrentHashSet();
 
-  /**
-   * True if at least a single query has been run by this session, false otherwise.
-   */
-  private volatile boolean queryCompleted;
-
   private ReadWriteLock closeLock = new ReentrantReadWriteLock();
 
   SparkSessionImpl(String sessionId) {
@@ -123,27 +118,26 @@ public class SparkSessionImpl implements SparkSession {
 
   @Override
   public void open(HiveConf conf) throws HiveException {
-    closeLock.readLock().lock();
+    closeLock.writeLock().lock();
+
     try {
-      LOG.info("Trying to open Hive on Spark session {}", sessionId);
-      this.conf = conf;
-      isOpen = true;
-      try {
-        hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId,
-              SessionState.get().getSessionId());
-      } catch (Throwable e) {
-        // It's possible that user session is closed while creating Spark client.
-        HiveException he;
-        if (isOpen) {
-          he = getHiveException(e);
-        } else {
-          he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId);
+      if (!isOpen) {
+        LOG.info("Trying to open Hive on Spark session {}", sessionId);
+        this.conf = conf;
+
+        try {
+          hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId,
+                  SessionState.get().getSessionId());
+          isOpen = true;
+        } catch (Throwable e) {
+          throw getHiveException(e);
         }
-        throw he;
+        LOG.info("Hive on Spark session {} successfully opened", sessionId);
+      } else {
+        LOG.info("Hive on Spark session {} is already opened", sessionId);
       }
-      LOG.info("Hive on Spark session {} successfully opened", sessionId);
     } finally {
-      closeLock.readLock().unlock();
+      closeLock.writeLock().unlock();
     }
   }
 
@@ -198,12 +192,7 @@ public class SparkSessionImpl implements SparkSession {
 
   @Override
   public boolean isOpen() {
-    closeLock.readLock().lock();
-    try {
-      return isOpen;
-    } finally {
-      closeLock.readLock().unlock();
-    }
+    return isOpen;
   }
 
   @Override
@@ -220,10 +209,10 @@ public class SparkSessionImpl implements SparkSession {
   public void close() {
     if (isOpen) {
       closeLock.writeLock().lock();
+
       try {
         if (isOpen) {
           LOG.info("Trying to close Hive on Spark session {}", sessionId);
-          isOpen = false;
           if (hiveSparkClient != null) {
             try {
               hiveSparkClient.close();
@@ -234,8 +223,9 @@ public class SparkSessionImpl implements SparkSession {
             }
           }
           hiveSparkClient = null;
-          queryCompleted = false;
           lastSparkJobCompletionTime = 0;
+
+          isOpen = false;
         }
       } finally {
         closeLock.writeLock().unlock();
@@ -348,10 +338,11 @@ public class SparkSessionImpl implements SparkSession {
    */
   @Override
   public boolean triggerTimeout(long sessionTimeout) {
-    if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
+    if (hasTimedOut(activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
       closeLock.writeLock().lock();
+
       try {
-        if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
+        if (hasTimedOut(activeJobs, lastSparkJobCompletionTime, sessionTimeout)) {
           LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " +
                   "been run in the past " + sessionTimeout / 1000 + " seconds");
           close();
@@ -366,28 +357,24 @@ public class SparkSessionImpl implements SparkSession {
 
   /**
    * Returns true if a session has timed out, false otherwise. The following conditions must be met
-   * in order to consider a session as timed out: (1) the session must have run at least one
-   * query, (2) there can be no actively running Spark jobs, and (3) the last completed Spark job
-   * must have been more than sessionTimeout seconds ago.
+   * in order to consider a session as timed out:
+   * (1) the session must have run at least one query (i.e. lastSparkJobCompletionTime > 0),
+   * (2) there can be no actively running Spark jobs, and
+   * (3) the last completed Spark job must have been more than sessionTimeout seconds ago.
    */
-  private static boolean hasTimedOut(boolean queryCompleted, Set<String> activeJobs,
+  private static boolean hasTimedOut(Set<String> activeJobs,
                                      long lastSparkJobCompletionTime, long sessionTimeout) {
-    return queryCompleted &&
-            activeJobs.isEmpty() &&
+    return activeJobs.isEmpty() &&
             lastSparkJobCompletionTime > 0 &&
             (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout;
   }
 
   /**
-   * When this session completes the execution of a query, set the {@link #queryCompleted} flag
-   * to true if it hasn't already been set, remove the query from the list of actively running jobs,
+   * When this session completes the execution of a query, remove the query from the list of actively running jobs,
    * and set the {@link #lastSparkJobCompletionTime} to the current timestamp.
    */
   @Override
   public void onQueryCompletion(String queryId) {
-    if (!queryCompleted) {
-      queryCompleted = true;
-    }
     activeJobs.remove(queryId);
     lastSparkJobCompletionTime = System.currentTimeMillis();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/36015eab/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java
new file mode 100644
index 0000000..bbf3d9c
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionImpl;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.net.MalformedURLException;
+import java.nio.file.Paths;
+import java.util.List;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.fail;
+
+/**
+ * With local spark context, all user sessions share the same spark context.
+ */
+public class TestLocalHiveSparkClient {
+
+  private final CyclicBarrier barrier = new CyclicBarrier(2);
+
+  @Test
+  public void testMultiSessionSparkContextReUse() throws MalformedURLException {
+    String confDir = "../data/conf/spark/local/hive-site.xml";
+    HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
+
+    ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
+
+    List<CompletableFuture<Void>> futures =
+        IntStream.range(0, barrier.getParties()).boxed()
+            .map(i -> CompletableFuture.supplyAsync(() -> execute(i), executor))
+            .collect(Collectors.toList());
+
+    futures.forEach(CompletableFuture::join);
+  }
+
+  private Void execute(Integer threadId) {
+    HiveConf conf = new HiveConf();
+
+    conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
+    conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+        "TestLocalHiveSparkClient-testMultiSessionSparkContextReuse-local-dir").toString());
+
+    SessionState.start(conf);
+    try{
+      runSparkTestSession(conf, threadId);
+    } catch (Exception ex){
+      fail(ex.getMessage());
+    }
+    return null;
+  }
+
+  private void runSparkTestSession(HiveConf conf, int threadId) throws Exception {
+    conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "10s");
+    conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s");
+
+    Driver driver = null;
+    try {
+      driver = new Driver(new QueryState.Builder()
+          .withGenerateNewQueryId(true)
+          .withHiveConf(conf).build(), null, null);
+
+      SparkSession sparkSession = SparkUtilities.getSparkSession(conf,
+          SparkSessionManagerImpl.getInstance());
+
+      Assert.assertEquals(0, driver.run("show tables").getResponseCode());
+      barrier.await();
+
+      SparkContext sparkContext = getSparkContext(sparkSession);
+      Assert.assertFalse(sparkContext.isStopped());
+
+      if(threadId == 1) {
+        barrier.await();
+        closeSparkSession(sparkSession);
+        Assert.assertTrue(sparkContext.isStopped());
+
+      } else {
+        closeSparkSession(sparkSession);
+        Assert.assertFalse(sparkContext.isStopped());
+        barrier.await();
+      }
+    } finally {
+      if (driver != null) {
+        driver.destroy();
+      }
+    }
+  }
+
+  private void closeSparkSession(SparkSession session) throws ReflectiveOperationException {
+    Assert.assertTrue(session.isOpen());
+    session.close();
+
+    Assert.assertFalse(session.isOpen());
+  }
+
+  private SparkContext getSparkContext(SparkSession sparkSession) throws ReflectiveOperationException {
+    HiveSparkClient sparkClient = getSparkClient(sparkSession);
+    Assert.assertNotNull(sparkClient);
+
+    return getSparkContext(sparkClient).sc();
+  }
+
+  private JavaSparkContext getSparkContext(HiveSparkClient sparkClient) throws ReflectiveOperationException {
+    Field sparkContextField = LocalHiveSparkClient.class.getDeclaredField("sc");
+    sparkContextField.setAccessible(true);
+
+    return (JavaSparkContext) sparkContextField.get(sparkClient);
+  }
+
+  private HiveSparkClient getSparkClient(SparkSession sparkSession) throws ReflectiveOperationException {
+    Field clientField = SparkSessionImpl.class.getDeclaredField("hiveSparkClient");
+    clientField.setAccessible(true);
+
+    return (HiveSparkClient) clientField.get(sparkSession);
+  }
+}