You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/08/09 05:38:30 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3192] Refactor TPCDSConf

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7720c9f6e [KYUUBI #3192] Refactor TPCDSConf
7720c9f6e is described below

commit 7720c9f6e22d71ff71c37772c3fde2104570ec6b
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Aug 9 13:38:20 2022 +0800

    [KYUUBI #3192] Refactor TPCDSConf
    
    ### _Why are the changes needed?_
    
    This PR
    
    1. centralize all configurations of Kyuubi Spark TPC-DS connector into `object TPCDSConf`
    2. rename `TPCDSBatchScanConf` to `TPCDSReadConf`
    3. add prefix `spark.connector.tpcds` for corresponding table properties (since we don't support `CREATE TABLE ... TBLPROPERTIES` and `ALTER TABLE ... SET TBLPROPERTIES`, the change affects nothing)
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3192 from pan3793/tpcds-conf.
    
    Closes #3192
    
    70de44d4 [Cheng Pan] fix ut
    2e59d932 [Cheng Pan] sytle
    0eaf97bd [Cheng Pan] Refactor TPCDSConf
    16ab878e [Cheng Pan] Refactor TPCDSConf
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../spark/connector/tpcds/TPCDSBatchScan.scala     |  4 +-
 .../spark/connector/tpcds/TPCDSBatchScanConf.scala | 49 ----------------------
 .../spark/connector/tpcds/TPCDSCatalog.scala       | 10 ++---
 .../kyuubi/spark/connector/tpcds/TPCDSConf.scala   | 38 +++++++++++++++--
 .../kyuubi/spark/connector/tpcds/TPCDSTable.scala  |  8 +---
 .../spark/connector/tpcds/TPCDSCatalogSuite.scala  | 17 ++++++--
 .../spark/connector/tpcds/TPCDSTableSuite.scala    |  4 +-
 .../spark/kyuubi/TPCDSTableGenerateBenchmark.scala |  8 +++-
 8 files changed, 65 insertions(+), 73 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScan.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScan.scala
index 4fa243271..291031c53 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScan.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScan.scala
@@ -37,7 +37,7 @@ class TPCDSBatchScan(
     @transient table: Table,
     scale: Double,
     schema: StructType,
-    scanConf: TPCDSBatchScanConf) extends ScanBuilder
+    readConf: TPCDSReadConf) extends ScanBuilder
   with SupportsReportStatistics with Batch with Serializable {
 
   private val _sizeInBytes: Long = TPCDSStatisticsUtils.sizeInBytes(table, scale)
@@ -49,7 +49,7 @@ class TPCDSBatchScan(
     if (table.isSmall) 1
     else math.max(
       SparkSession.active.sparkContext.defaultParallelism,
-      (_sizeInBytes / scanConf.maxPartitionBytes).ceil.toInt)
+      (_sizeInBytes / readConf.maxPartitionBytes).ceil.toInt)
 
   override def build: Scan = this
 
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScanConf.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScanConf.scala
deleted file mode 100644
index cf99b6bb1..000000000
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScanConf.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.spark.connector.tpcds
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-import org.apache.kyuubi.spark.connector.common.SparkConfParser
-import org.apache.kyuubi.spark.connector.tpcds.TPCDSBatchScanConf._
-
-case class TPCDSBatchScanConf(
-    spark: SparkSession,
-    table: Table,
-    options: CaseInsensitiveStringMap) {
-
-  private val confParser: SparkConfParser =
-    SparkConfParser(options, spark.conf, table.properties())
-
-  lazy val maxPartitionBytes: Long = confParser.longConf()
-    .option(MAX_PARTITION_BYTES_CONF)
-    .sessionConf(s"$TPCDS_CONNECTOR_READ_CONF_PREFIX.$MAX_PARTITION_BYTES_CONF")
-    .tableProperty(MAX_PARTITION_BYTES_CONF)
-    .defaultValue(MAX_PARTITION_BYTES_DEFAULT)
-    .parse()
-
-}
-
-object TPCDSBatchScanConf {
-  val TPCDS_CONNECTOR_READ_CONF_PREFIX = "spark.connector.tpcds.read"
-
-  val MAX_PARTITION_BYTES_CONF = "maxPartitionBytes"
-  val MAX_PARTITION_BYTES_DEFAULT = 128 * 1024 * 1024L
-}
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
index 06a36cac0..2e1d21131 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
@@ -22,6 +22,7 @@ import java.util
 import scala.collection.JavaConverters._
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
 import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table => SparkTable, TableCatalog, TableChange}
 import org.apache.spark.sql.connector.expressions.Transform
@@ -34,7 +35,7 @@ class TPCDSCatalog extends TableCatalog with SupportsNamespaces with Logging {
 
   val tables: Array[String] = TPCDSSchemaUtils.BASE_TABLES.map(_.getName)
 
-  var options: CaseInsensitiveStringMap = _
+  var tpcdsConf: TPCDSConf = _
 
   var _name: String = _
 
@@ -42,9 +43,8 @@ class TPCDSCatalog extends TableCatalog with SupportsNamespaces with Logging {
 
   override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
     this._name = name
-    this.options = options
-    val uncheckedExcludeDatabases = options.getOrDefault("excludeDatabases", "")
-      .split(",").map(_.toLowerCase.trim).filter(_.nonEmpty)
+    this.tpcdsConf = TPCDSConf(SparkSession.active, options)
+    val uncheckedExcludeDatabases = tpcdsConf.excludeDatabases
     val invalidExcludeDatabases = uncheckedExcludeDatabases diff TPCDSSchemaUtils.DATABASES
     if (invalidExcludeDatabases.nonEmpty) {
       logWarning(
@@ -64,7 +64,7 @@ class TPCDSCatalog extends TableCatalog with SupportsNamespaces with Logging {
   override def loadTable(ident: Identifier): SparkTable = (ident.namespace, ident.name) match {
     case (Array(db), table) if (databases contains db) && tables.contains(table.toLowerCase) =>
       val scale = TPCDSSchemaUtils.scale(db)
-      new TPCDSTable(table.toLowerCase, scale, options)
+      new TPCDSTable(table.toLowerCase, scale, tpcdsConf)
     case (_, _) => throw new NoSuchTableException(ident)
   }
 
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSConf.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSConf.scala
index f2a777312..6b62fac1a 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSConf.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSConf.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.spark.connector.tpcds
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import org.apache.kyuubi.spark.connector.common.SparkConfParser
@@ -26,12 +27,20 @@ import org.apache.kyuubi.spark.connector.tpcds.TPCDSConf._
 case class TPCDSConf(spark: SparkSession, options: CaseInsensitiveStringMap) {
 
   private val confParser: SparkConfParser = SparkConfParser(options, spark.conf, null)
+
+  lazy val excludeDatabases: Array[String] = confParser.stringConf()
+    .option(EXCLUDE_DATABASES)
+    .parseOptional()
+    .map(_.split(",").map(_.toLowerCase.trim).filter(_.nonEmpty))
+    .getOrElse(Array.empty)
+
   // When true, use CHAR VARCHAR; otherwise use STRING
   lazy val useAnsiStringType: Boolean = confParser.booleanConf()
     .option(USE_ANSI_STRING_TYPE)
-    .sessionConf(USE_ANSI_STRING_TYPE)
+    .sessionConf(s"$TPCDS_CONNECTOR_CONF_PREFIX.$USE_ANSI_STRING_TYPE")
     .defaultValue(USE_ANSI_STRING_TYPE_DEFAULT)
     .parse()
+
   // 09-26-2017 v2.6.0
   // Replaced two occurrences of "c_last_review_date" with "c_last_review_date_sk" to be consistent
   // with Table 2-14 (Customer Table Column Definitions) in section 2.4.7 of the specification
@@ -40,14 +49,37 @@ case class TPCDSConf(spark: SparkSession, options: CaseInsensitiveStringMap) {
   // https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v3.2.0.pdf
   lazy val useTableSchema_2_6: Boolean = confParser.booleanConf()
     .option(USE_TABLE_SCHEMA_2_6)
-    .sessionConf(USE_TABLE_SCHEMA_2_6)
+    .sessionConf(s"$TPCDS_CONNECTOR_CONF_PREFIX.$USE_TABLE_SCHEMA_2_6")
     .defaultValue(USE_TABLE_SCHEMA_2_6_DEFAULT)
     .parse()
 }
 
+case class TPCDSReadConf(
+    spark: SparkSession,
+    table: Table,
+    options: CaseInsensitiveStringMap) {
+
+  private val confParser: SparkConfParser =
+    SparkConfParser(options, spark.conf, table.properties)
+
+  lazy val maxPartitionBytes: Long = confParser.longConf()
+    .option(MAX_PARTITION_BYTES_CONF)
+    .sessionConf(s"$TPCDS_CONNECTOR_READ_CONF_PREFIX.$MAX_PARTITION_BYTES_CONF")
+    .tableProperty(s"$TPCDS_CONNECTOR_READ_CONF_PREFIX.$MAX_PARTITION_BYTES_CONF")
+    .defaultValue(MAX_PARTITION_BYTES_DEFAULT)
+    .parse()
+}
+
 object TPCDSConf {
+  val EXCLUDE_DATABASES = "excludeDatabases"
+
+  val TPCDS_CONNECTOR_CONF_PREFIX = "spark.connector.tpcds"
   val USE_ANSI_STRING_TYPE = "useAnsiStringType"
-  val USE_ANSI_STRING_TYPE_DEFAULT = false;
+  val USE_ANSI_STRING_TYPE_DEFAULT = false
   val USE_TABLE_SCHEMA_2_6 = "useTableSchema_2_6"
   val USE_TABLE_SCHEMA_2_6_DEFAULT = true
+
+  val TPCDS_CONNECTOR_READ_CONF_PREFIX = s"$TPCDS_CONNECTOR_CONF_PREFIX.read"
+  val MAX_PARTITION_BYTES_CONF = "maxPartitionBytes"
+  val MAX_PARTITION_BYTES_DEFAULT: Long = 128 * 1024 * 1024L
 }
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
index d75f59ce9..d635e1650 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
@@ -32,15 +32,11 @@ import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class TPCDSTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
+class TPCDSTable(tbl: String, scale: Double, tpcdsConf: TPCDSConf)
   extends SparkTable with SupportsRead {
 
   val tpcdsTable: Table = Table.getTable(tbl)
 
-  lazy val spark: SparkSession = SparkSession.active
-
-  lazy val tpcdsConf: TPCDSConf = TPCDSConf(spark, options);
-
   override def name: String = s"${TPCDSSchemaUtils.dbName(scale)}.$tbl"
 
   override def toString: String = s"TPCDSTable($name)"
@@ -72,7 +68,7 @@ class TPCDSTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
     Set(TableCapability.BATCH_READ).asJava
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
-    val scanConf = TPCDSBatchScanConf(spark, this, options)
+    val scanConf = TPCDSReadConf(SparkSession.active, this, options)
     new TPCDSBatchScan(tpcdsTable, scale, schema, scanConf)
   }
 
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
index 3ecfeabb7..8a37d95e8 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
@@ -28,10 +28,19 @@ import org.apache.kyuubi.spark.connector.common.SparkUtils
 class TPCDSCatalogSuite extends KyuubiFunSuite {
 
   test("get catalog name") {
-    val catalog = new TPCDSCatalog
-    val catalogName = "test"
-    catalog.initialize(catalogName, CaseInsensitiveStringMap.empty())
-    assert(catalog._name == catalogName)
+    val sparkConf = new SparkConf()
+      .setMaster("local[*]")
+      .set("spark.ui.enabled", "false")
+      .set("spark.sql.catalogImplementation", "in-memory")
+      .set("spark.sql.catalog.tpcds", classOf[TPCDSCatalog].getName)
+      .set("spark.sql.cbo.enabled", "true")
+      .set("spark.sql.cbo.planStats.enabled", "true")
+    withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+      val catalog = new TPCDSCatalog
+      val catalogName = "test"
+      catalog.initialize(catalogName, CaseInsensitiveStringMap.empty())
+      assert(catalog._name == catalogName)
+    }
   }
 
   test("supports namespaces") {
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
index af983d8c8..f948ec4ef 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 
 import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
-import org.apache.kyuubi.spark.connector.tpcds.TPCDSBatchScanConf.{MAX_PARTITION_BYTES_CONF, TPCDS_CONNECTOR_READ_CONF_PREFIX}
+import org.apache.kyuubi.spark.connector.tpcds.TPCDSConf._
 
 class TPCDSTableSuite extends KyuubiFunSuite {
 
@@ -144,7 +144,7 @@ class TPCDSTableSuite extends KyuubiFunSuite {
       assert(scan.isDefined)
       val expected =
         (TPCDSStatisticsUtils.sizeInBytes(table, scale) / maxPartitionBytes).ceil.toInt
-      assert(scan.get.planInputPartitions.size == expected)
+      assert(scan.get.planInputPartitions.length == expected)
     }
   }
 }
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/spark/kyuubi/TPCDSTableGenerateBenchmark.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/spark/kyuubi/TPCDSTableGenerateBenchmark.scala
index ba8ef64d0..754d80b00 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/spark/kyuubi/TPCDSTableGenerateBenchmark.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/spark/kyuubi/TPCDSTableGenerateBenchmark.scala
@@ -21,10 +21,11 @@ import scala.concurrent.duration._
 
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.kyuubi.benchmark.KyuubiBenchmarkBase
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.spark.connector.tpcds.{TPCDSPartitionReader, TPCDSSchemaUtils, TPCDSStatisticsUtils, TPCDSTable}
+import org.apache.kyuubi.spark.connector.tpcds._
 
 /**
  * Benchmark to measure the performance of generate TPCDSTable.
@@ -67,7 +68,10 @@ class TPCDSTableGenerateBenchmark extends KyuubiFunSuite with KyuubiBenchmarkBas
   }
 
   private def generateTable(tableName: String, rowCount: Long): Unit = {
-    val table = new TPCDSTable(tableName, scale, CaseInsensitiveStringMap.empty())
+    val table = new TPCDSTable(
+      tableName,
+      scale,
+      TPCDSConf(SparkSession.active, CaseInsensitiveStringMap.empty()))
     val reader = new TPCDSPartitionReader(
       tableName,
       scale,