You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/08/09 05:38:30 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3192] Refactor TPCDSConf
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 7720c9f6e [KYUUBI #3192] Refactor TPCDSConf
7720c9f6e is described below
commit 7720c9f6e22d71ff71c37772c3fde2104570ec6b
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Aug 9 13:38:20 2022 +0800
[KYUUBI #3192] Refactor TPCDSConf
### _Why are the changes needed?_
This PR
1. centralize all configurations of Kyuubi Spark TPC-DS connector into `object TPCDSConf`
2. rename `TPCDSBatchScanConf` to `TPCDSReadConf`
3. add prefix `spark.connector.tpcds` for corresponding table properties (since we don't support `CREATE TABLE ... TBLPROPERTIES` and `ALTER TABLE ... SET TBLPROPERTIES`, the change affects nothing)
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3192 from pan3793/tpcds-conf.
Closes #3192
70de44d4 [Cheng Pan] fix ut
2e59d932 [Cheng Pan] sytle
0eaf97bd [Cheng Pan] Refactor TPCDSConf
16ab878e [Cheng Pan] Refactor TPCDSConf
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../spark/connector/tpcds/TPCDSBatchScan.scala | 4 +-
.../spark/connector/tpcds/TPCDSBatchScanConf.scala | 49 ----------------------
.../spark/connector/tpcds/TPCDSCatalog.scala | 10 ++---
.../kyuubi/spark/connector/tpcds/TPCDSConf.scala | 38 +++++++++++++++--
.../kyuubi/spark/connector/tpcds/TPCDSTable.scala | 8 +---
.../spark/connector/tpcds/TPCDSCatalogSuite.scala | 17 ++++++--
.../spark/connector/tpcds/TPCDSTableSuite.scala | 4 +-
.../spark/kyuubi/TPCDSTableGenerateBenchmark.scala | 8 +++-
8 files changed, 65 insertions(+), 73 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScan.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScan.scala
index 4fa243271..291031c53 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScan.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScan.scala
@@ -37,7 +37,7 @@ class TPCDSBatchScan(
@transient table: Table,
scale: Double,
schema: StructType,
- scanConf: TPCDSBatchScanConf) extends ScanBuilder
+ readConf: TPCDSReadConf) extends ScanBuilder
with SupportsReportStatistics with Batch with Serializable {
private val _sizeInBytes: Long = TPCDSStatisticsUtils.sizeInBytes(table, scale)
@@ -49,7 +49,7 @@ class TPCDSBatchScan(
if (table.isSmall) 1
else math.max(
SparkSession.active.sparkContext.defaultParallelism,
- (_sizeInBytes / scanConf.maxPartitionBytes).ceil.toInt)
+ (_sizeInBytes / readConf.maxPartitionBytes).ceil.toInt)
override def build: Scan = this
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScanConf.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScanConf.scala
deleted file mode 100644
index cf99b6bb1..000000000
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSBatchScanConf.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.spark.connector.tpcds
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-import org.apache.kyuubi.spark.connector.common.SparkConfParser
-import org.apache.kyuubi.spark.connector.tpcds.TPCDSBatchScanConf._
-
-case class TPCDSBatchScanConf(
- spark: SparkSession,
- table: Table,
- options: CaseInsensitiveStringMap) {
-
- private val confParser: SparkConfParser =
- SparkConfParser(options, spark.conf, table.properties())
-
- lazy val maxPartitionBytes: Long = confParser.longConf()
- .option(MAX_PARTITION_BYTES_CONF)
- .sessionConf(s"$TPCDS_CONNECTOR_READ_CONF_PREFIX.$MAX_PARTITION_BYTES_CONF")
- .tableProperty(MAX_PARTITION_BYTES_CONF)
- .defaultValue(MAX_PARTITION_BYTES_DEFAULT)
- .parse()
-
-}
-
-object TPCDSBatchScanConf {
- val TPCDS_CONNECTOR_READ_CONF_PREFIX = "spark.connector.tpcds.read"
-
- val MAX_PARTITION_BYTES_CONF = "maxPartitionBytes"
- val MAX_PARTITION_BYTES_DEFAULT = 128 * 1024 * 1024L
-}
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
index 06a36cac0..2e1d21131 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
@@ -22,6 +22,7 @@ import java.util
import scala.collection.JavaConverters._
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table => SparkTable, TableCatalog, TableChange}
import org.apache.spark.sql.connector.expressions.Transform
@@ -34,7 +35,7 @@ class TPCDSCatalog extends TableCatalog with SupportsNamespaces with Logging {
val tables: Array[String] = TPCDSSchemaUtils.BASE_TABLES.map(_.getName)
- var options: CaseInsensitiveStringMap = _
+ var tpcdsConf: TPCDSConf = _
var _name: String = _
@@ -42,9 +43,8 @@ class TPCDSCatalog extends TableCatalog with SupportsNamespaces with Logging {
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
this._name = name
- this.options = options
- val uncheckedExcludeDatabases = options.getOrDefault("excludeDatabases", "")
- .split(",").map(_.toLowerCase.trim).filter(_.nonEmpty)
+ this.tpcdsConf = TPCDSConf(SparkSession.active, options)
+ val uncheckedExcludeDatabases = tpcdsConf.excludeDatabases
val invalidExcludeDatabases = uncheckedExcludeDatabases diff TPCDSSchemaUtils.DATABASES
if (invalidExcludeDatabases.nonEmpty) {
logWarning(
@@ -64,7 +64,7 @@ class TPCDSCatalog extends TableCatalog with SupportsNamespaces with Logging {
override def loadTable(ident: Identifier): SparkTable = (ident.namespace, ident.name) match {
case (Array(db), table) if (databases contains db) && tables.contains(table.toLowerCase) =>
val scale = TPCDSSchemaUtils.scale(db)
- new TPCDSTable(table.toLowerCase, scale, options)
+ new TPCDSTable(table.toLowerCase, scale, tpcdsConf)
case (_, _) => throw new NoSuchTableException(ident)
}
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSConf.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSConf.scala
index f2a777312..6b62fac1a 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSConf.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSConf.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.spark.connector.tpcds
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.spark.connector.common.SparkConfParser
@@ -26,12 +27,20 @@ import org.apache.kyuubi.spark.connector.tpcds.TPCDSConf._
case class TPCDSConf(spark: SparkSession, options: CaseInsensitiveStringMap) {
private val confParser: SparkConfParser = SparkConfParser(options, spark.conf, null)
+
+ lazy val excludeDatabases: Array[String] = confParser.stringConf()
+ .option(EXCLUDE_DATABASES)
+ .parseOptional()
+ .map(_.split(",").map(_.toLowerCase.trim).filter(_.nonEmpty))
+ .getOrElse(Array.empty)
+
// When true, use CHAR VARCHAR; otherwise use STRING
lazy val useAnsiStringType: Boolean = confParser.booleanConf()
.option(USE_ANSI_STRING_TYPE)
- .sessionConf(USE_ANSI_STRING_TYPE)
+ .sessionConf(s"$TPCDS_CONNECTOR_CONF_PREFIX.$USE_ANSI_STRING_TYPE")
.defaultValue(USE_ANSI_STRING_TYPE_DEFAULT)
.parse()
+
// 09-26-2017 v2.6.0
// Replaced two occurrences of "c_last_review_date" with "c_last_review_date_sk" to be consistent
// with Table 2-14 (Customer Table Column Definitions) in section 2.4.7 of the specification
@@ -40,14 +49,37 @@ case class TPCDSConf(spark: SparkSession, options: CaseInsensitiveStringMap) {
// https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v3.2.0.pdf
lazy val useTableSchema_2_6: Boolean = confParser.booleanConf()
.option(USE_TABLE_SCHEMA_2_6)
- .sessionConf(USE_TABLE_SCHEMA_2_6)
+ .sessionConf(s"$TPCDS_CONNECTOR_CONF_PREFIX.$USE_TABLE_SCHEMA_2_6")
.defaultValue(USE_TABLE_SCHEMA_2_6_DEFAULT)
.parse()
}
+case class TPCDSReadConf(
+ spark: SparkSession,
+ table: Table,
+ options: CaseInsensitiveStringMap) {
+
+ private val confParser: SparkConfParser =
+ SparkConfParser(options, spark.conf, table.properties)
+
+ lazy val maxPartitionBytes: Long = confParser.longConf()
+ .option(MAX_PARTITION_BYTES_CONF)
+ .sessionConf(s"$TPCDS_CONNECTOR_READ_CONF_PREFIX.$MAX_PARTITION_BYTES_CONF")
+ .tableProperty(s"$TPCDS_CONNECTOR_READ_CONF_PREFIX.$MAX_PARTITION_BYTES_CONF")
+ .defaultValue(MAX_PARTITION_BYTES_DEFAULT)
+ .parse()
+}
+
object TPCDSConf {
+ val EXCLUDE_DATABASES = "excludeDatabases"
+
+ val TPCDS_CONNECTOR_CONF_PREFIX = "spark.connector.tpcds"
val USE_ANSI_STRING_TYPE = "useAnsiStringType"
- val USE_ANSI_STRING_TYPE_DEFAULT = false;
+ val USE_ANSI_STRING_TYPE_DEFAULT = false
val USE_TABLE_SCHEMA_2_6 = "useTableSchema_2_6"
val USE_TABLE_SCHEMA_2_6_DEFAULT = true
+
+ val TPCDS_CONNECTOR_READ_CONF_PREFIX = s"$TPCDS_CONNECTOR_CONF_PREFIX.read"
+ val MAX_PARTITION_BYTES_CONF = "maxPartitionBytes"
+ val MAX_PARTITION_BYTES_DEFAULT: Long = 128 * 1024 * 1024L
}
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
index d75f59ce9..d635e1650 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
@@ -32,15 +32,11 @@ import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-class TPCDSTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
+class TPCDSTable(tbl: String, scale: Double, tpcdsConf: TPCDSConf)
extends SparkTable with SupportsRead {
val tpcdsTable: Table = Table.getTable(tbl)
- lazy val spark: SparkSession = SparkSession.active
-
- lazy val tpcdsConf: TPCDSConf = TPCDSConf(spark, options);
-
override def name: String = s"${TPCDSSchemaUtils.dbName(scale)}.$tbl"
override def toString: String = s"TPCDSTable($name)"
@@ -72,7 +68,7 @@ class TPCDSTable(tbl: String, scale: Double, options: CaseInsensitiveStringMap)
Set(TableCapability.BATCH_READ).asJava
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
- val scanConf = TPCDSBatchScanConf(spark, this, options)
+ val scanConf = TPCDSReadConf(SparkSession.active, this, options)
new TPCDSBatchScan(tpcdsTable, scale, schema, scanConf)
}
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
index 3ecfeabb7..8a37d95e8 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
@@ -28,10 +28,19 @@ import org.apache.kyuubi.spark.connector.common.SparkUtils
class TPCDSCatalogSuite extends KyuubiFunSuite {
test("get catalog name") {
- val catalog = new TPCDSCatalog
- val catalogName = "test"
- catalog.initialize(catalogName, CaseInsensitiveStringMap.empty())
- assert(catalog._name == catalogName)
+ val sparkConf = new SparkConf()
+ .setMaster("local[*]")
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.catalogImplementation", "in-memory")
+ .set("spark.sql.catalog.tpcds", classOf[TPCDSCatalog].getName)
+ .set("spark.sql.cbo.enabled", "true")
+ .set("spark.sql.cbo.planStats.enabled", "true")
+ withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+ val catalog = new TPCDSCatalog
+ val catalogName = "test"
+ catalog.initialize(catalogName, CaseInsensitiveStringMap.empty())
+ assert(catalog._name == catalogName)
+ }
}
test("supports namespaces") {
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
index af983d8c8..f948ec4ef 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
-import org.apache.kyuubi.spark.connector.tpcds.TPCDSBatchScanConf.{MAX_PARTITION_BYTES_CONF, TPCDS_CONNECTOR_READ_CONF_PREFIX}
+import org.apache.kyuubi.spark.connector.tpcds.TPCDSConf._
class TPCDSTableSuite extends KyuubiFunSuite {
@@ -144,7 +144,7 @@ class TPCDSTableSuite extends KyuubiFunSuite {
assert(scan.isDefined)
val expected =
(TPCDSStatisticsUtils.sizeInBytes(table, scale) / maxPartitionBytes).ceil.toInt
- assert(scan.get.planInputPartitions.size == expected)
+ assert(scan.get.planInputPartitions.length == expected)
}
}
}
diff --git a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/spark/kyuubi/TPCDSTableGenerateBenchmark.scala b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/spark/kyuubi/TPCDSTableGenerateBenchmark.scala
index ba8ef64d0..754d80b00 100644
--- a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/spark/kyuubi/TPCDSTableGenerateBenchmark.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/spark/kyuubi/TPCDSTableGenerateBenchmark.scala
@@ -21,10 +21,11 @@ import scala.concurrent.duration._
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.kyuubi.benchmark.KyuubiBenchmarkBase
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.spark.connector.tpcds.{TPCDSPartitionReader, TPCDSSchemaUtils, TPCDSStatisticsUtils, TPCDSTable}
+import org.apache.kyuubi.spark.connector.tpcds._
/**
* Benchmark to measure the performance of generate TPCDSTable.
@@ -67,7 +68,10 @@ class TPCDSTableGenerateBenchmark extends KyuubiFunSuite with KyuubiBenchmarkBas
}
private def generateTable(tableName: String, rowCount: Long): Unit = {
- val table = new TPCDSTable(tableName, scale, CaseInsensitiveStringMap.empty())
+ val table = new TPCDSTable(
+ tableName,
+ scale,
+ TPCDSConf(SparkSession.active, CaseInsensitiveStringMap.empty()))
val reader = new TPCDSPartitionReader(
tableName,
scale,