You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/06/13 12:01:51 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2870] Fix sf0 query error in TPCH
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 883889517 [KYUUBI #2870] Fix sf0 query error in TPCH
883889517 is described below
commit 883889517cc5b0fc37f7253a501d814caf883259
Author: jiaoqingbo <11...@qq.com>
AuthorDate: Mon Jun 13 20:01:39 2022 +0800
[KYUUBI #2870] Fix sf0 query error in TPCH
### _Why are the changes needed?_
fix #2870
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2871 from jiaoqingbo/kyuubi-2870.
Closes #2870
c1fa0bed [jiaoqingbo] modify ut
643adf74 [jiaoqingbo] code review
530ee90f [jiaoqingbo] [KYUUBI #2870] Fix sf0 query error in TPCH
Authored-by: jiaoqingbo <11...@qq.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../kyuubi/spark/connector/tpch/TPCHBatchScan.scala | 15 ++++++++++++++-
.../spark/connector/tpch/TPCHCatalogSuite.scala | 20 ++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHBatchScan.scala b/extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHBatchScan.scala
index 2ce4e1a16..e05e717fc 100644
--- a/extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHBatchScan.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch/TPCHBatchScan.scala
@@ -19,9 +19,11 @@ package org.apache.kyuubi.spark.connector.tpch
import java.time.LocalDate
import java.time.format.DateTimeFormatter
+import java.util
import java.util.OptionalLong
import scala.collection.mutable.ArrayBuffer
+import scala.language.existentials
import io.trino.tpch._
import io.trino.tpch.GenerateUtils.formatDate
@@ -32,6 +34,8 @@ import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.kyuubi.spark.connector.tpch.TPCHSchemaUtils.normalize
+
case class TPCHTableChuck(table: String, scale: Double, parallelism: Int, index: Int)
extends InputPartition
@@ -94,7 +98,16 @@ class TPCHPartitionReader(
private lazy val dateFmt: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
- private val iterator = tpchTable.createGenerator(scale, index, parallelism).iterator
+ private val iterator = {
+ if (normalize(scale).equals("0")) {
+ new util.Iterator[TpchEntity] {
+
+ override def hasNext: Boolean = false
+
+ override def next(): TpchEntity = throw new NoSuchElementException("next on empty iterator")
+ }
+ } else tpchTable.createGenerator(scale, index, parallelism).iterator
+ }
private var currentRow: InternalRow = _
diff --git a/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala
index abb3581cb..70156a753 100644
--- a/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala
+++ b/extensions/spark/kyuubi-spark-connector-tpch/src/test/scala/org/apache/kyuubi/spark/connector/tpch/TPCHCatalogSuite.scala
@@ -99,6 +99,26 @@ class TPCHCatalogSuite extends KyuubiFunSuite {
}
}
+ test("tpch.sf0 count") {
+ val sparkConf = new SparkConf()
+ .setMaster("local[*]")
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.catalogImplementation", "in-memory")
+ .set("spark.sql.catalog.tpch", classOf[TPCHCatalog].getName)
+ .set("spark.sql.cbo.enabled", "true")
+ .set("spark.sql.cbo.planStats.enabled", "true")
+ withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+ assert(spark.table("tpch.sf0.customer").count === 0)
+ assert(spark.table("tpch.sf0.orders").count === 0)
+ assert(spark.table("tpch.sf0.lineitem").count === 0)
+ assert(spark.table("tpch.sf0.part").count === 0)
+ assert(spark.table("tpch.sf0.partsupp").count === 0)
+ assert(spark.table("tpch.sf0.supplier").count === 0)
+ assert(spark.table("tpch.sf0.nation").count === 0)
+ assert(spark.table("tpch.sf0.region").count === 0)
+ }
+ }
+
test("tpch.sf1 stats") {
val sparkConf = new SparkConf()
.setMaster("local[*]")