You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/03/02 18:38:54 UTC

spark git commit: [SPARK-23518][SQL] Avoid metastore access when the users only want to read and write data frames

Repository: spark
Updated Branches:
  refs/heads/master 0b6ceadeb -> 3a4d15e5d


[SPARK-23518][SQL] Avoid metastore access when the users only want to read and write data frames

## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/18944 added one patch, which allowed a spark session to be created when the hive metastore server is down. However, it did not allow running any commands with the spark session. This brings troubles to the user who only wants to read / write data frames without metastore setup.

## How was this patch tested?

Added some unit tests to read and write data frames based on the original HiveMetastoreLazyInitializationSuite.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Feng Liu <fe...@databricks.com>

Closes #20681 from liufengdb/completely-lazy.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a4d15e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a4d15e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a4d15e5

Branch: refs/heads/master
Commit: 3a4d15e5d2b9ddbaeb2a6ab2d86d059ada6407b2
Parents: 0b6cead
Author: Feng Liu <fe...@databricks.com>
Authored: Fri Mar 2 10:38:50 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Mar 2 10:38:50 2018 -0800

----------------------------------------------------------------------
 R/pkg/tests/fulltests/test_sparkSQL.R                 |  2 ++
 .../spark/sql/catalyst/catalog/SessionCatalog.scala   | 11 +++++++----
 .../spark/sql/internal/BaseSessionStateBuilder.scala  |  4 ++--
 .../hive/HiveMetastoreLazyInitializationSuite.scala   | 14 ++++++++++++++
 .../apache/spark/sql/hive/HiveSessionCatalog.scala    |  8 ++++----
 .../spark/sql/hive/HiveSessionStateBuilder.scala      | 10 +++++-----
 6 files changed, 34 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a4d15e5/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 5197838..bd0a0dc 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -67,6 +67,8 @@ sparkSession <- if (windows_with_hadoop()) {
     sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
   }
 sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+# materialize the catalog implementation
+listTables()
 
 mockLines <- c("{\"name\":\"Michael\"}",
                "{\"name\":\"Andy\", \"age\":30}",

http://git-wip-us.apache.org/repos/asf/spark/blob/3a4d15e5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 4b119c7..64e7ca1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -54,8 +54,8 @@ object SessionCatalog {
  * This class must be thread-safe.
  */
 class SessionCatalog(
-    val externalCatalog: ExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
+    externalCatalogBuilder: () => ExternalCatalog,
+    globalTempViewManagerBuilder: () => GlobalTempViewManager,
     functionRegistry: FunctionRegistry,
     conf: SQLConf,
     hadoopConf: Configuration,
@@ -70,8 +70,8 @@ class SessionCatalog(
       functionRegistry: FunctionRegistry,
       conf: SQLConf) {
     this(
-      externalCatalog,
-      new GlobalTempViewManager("global_temp"),
+      () => externalCatalog,
+      () => new GlobalTempViewManager("global_temp"),
       functionRegistry,
       conf,
       new Configuration(),
@@ -87,6 +87,9 @@ class SessionCatalog(
       new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
   }
 
+  lazy val externalCatalog = externalCatalogBuilder()
+  lazy val globalTempViewManager = globalTempViewManagerBuilder()
+
   /** List of temporary views, mapping from table name to their logical plan. */
   @GuardedBy("this")
   protected val tempViews = new mutable.HashMap[String, LogicalPlan]

http://git-wip-us.apache.org/repos/asf/spark/blob/3a4d15e5/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 007f876..3a0db7e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -130,8 +130,8 @@ abstract class BaseSessionStateBuilder(
    */
   protected lazy val catalog: SessionCatalog = {
     val catalog = new SessionCatalog(
-      session.sharedState.externalCatalog,
-      session.sharedState.globalTempViewManager,
+      () => session.sharedState.externalCatalog,
+      () => session.sharedState.globalTempViewManager,
       functionRegistry,
       conf,
       SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),

http://git-wip-us.apache.org/repos/asf/spark/blob/3a4d15e5/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala
index 3f135cc..277df54 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala
@@ -38,6 +38,20 @@ class HiveMetastoreLazyInitializationSuite extends SparkFunSuite {
       // We should be able to run Spark jobs without Hive client.
       assert(spark.sparkContext.range(0, 1).count() === 1)
 
+      // We should be able to use Spark SQL if no table references.
+      assert(spark.sql("select 1 + 1").count() === 1)
+      assert(spark.range(0, 1).count() === 1)
+
+      // We should be able to use fs
+      val path = Utils.createTempDir()
+      path.delete()
+      try {
+        spark.range(0, 1).write.parquet(path.getAbsolutePath)
+        assert(spark.read.parquet(path.getAbsolutePath).count() === 1)
+      } finally {
+        Utils.deleteRecursively(path)
+      }
+
       // Make sure that we are not using the local derby metastore.
       val exceptionString = Utils.exceptionString(intercept[AnalysisException] {
         spark.sql("show tables")

http://git-wip-us.apache.org/repos/asf/spark/blob/3a4d15e5/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 1f11adb..e5aff3b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -39,8 +39,8 @@ import org.apache.spark.sql.types.{DecimalType, DoubleType}
 
 
 private[sql] class HiveSessionCatalog(
-    externalCatalog: HiveExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
+    externalCatalogBuilder: () => HiveExternalCatalog,
+    globalTempViewManagerBuilder: () => GlobalTempViewManager,
     val metastoreCatalog: HiveMetastoreCatalog,
     functionRegistry: FunctionRegistry,
     conf: SQLConf,
@@ -48,8 +48,8 @@ private[sql] class HiveSessionCatalog(
     parser: ParserInterface,
     functionResourceLoader: FunctionResourceLoader)
   extends SessionCatalog(
-      externalCatalog,
-      globalTempViewManager,
+      externalCatalogBuilder,
+      globalTempViewManagerBuilder,
       functionRegistry,
       conf,
       hadoopConf,

http://git-wip-us.apache.org/repos/asf/spark/blob/3a4d15e5/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 12c7436..40b9bb5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -42,8 +42,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
    * Create a Hive aware resource loader.
    */
   override protected lazy val resourceLoader: HiveSessionResourceLoader = {
-    val client: HiveClient = externalCatalog.client
-    new HiveSessionResourceLoader(session, client)
+    new HiveSessionResourceLoader(session, () => externalCatalog.client)
   }
 
   /**
@@ -51,8 +50,8 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
    */
   override protected lazy val catalog: HiveSessionCatalog = {
     val catalog = new HiveSessionCatalog(
-      externalCatalog,
-      session.sharedState.globalTempViewManager,
+      () => externalCatalog,
+      () => session.sharedState.globalTempViewManager,
       new HiveMetastoreCatalog(session),
       functionRegistry,
       conf,
@@ -105,8 +104,9 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
 
 class HiveSessionResourceLoader(
     session: SparkSession,
-    client: HiveClient)
+    clientBuilder: () => HiveClient)
   extends SessionResourceLoader(session) {
+  private lazy val client = clientBuilder()
   override def addJar(path: String): Unit = {
     client.addJar(path)
     super.addJar(path)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org