You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/18 09:11:06 UTC
(incubator-paimon) branch master updated: [spark] Support 'show partitions' sql (#2521)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0ec1bb60e [spark] Support 'show partitions' sql (#2521)
0ec1bb60e is described below
commit 0ec1bb60e6a9846717eb20cbc90640da12f2760b
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Mon Dec 18 17:11:00 2023 +0800
[spark] Support 'show partitions' sql (#2521)
---
.../paimon/spark/PaimonPartitionManagement.scala | 9 ++---
.../spark/sql/PaimonPartitionManagementTest.scala | 47 +++++++++++-----------
2 files changed, 27 insertions(+), 29 deletions(-)
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index d4461ff1c..c1872804f 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -28,8 +28,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.types.StructType
-import java.util.{Collections, UUID}
-import java.util.{Map => JMap}
+import java.util.{Collections, Map => JMap, UUID}
import scala.collection.JavaConverters._
@@ -89,7 +88,7 @@ trait PaimonPartitionManagement extends SupportsPartitionManagement {
s"Number of partition names (${partitionCols.length}) must be equal to " +
s"the number of partition values (${internalRow.numFields})."
)
- val schema: StructType = partitionSchema
+ val schema: StructType = partitionSchema()
assert(
partitionCols.forall(fieldName => schema.fieldNames.contains(fieldName)),
s"Some partition names ${partitionCols.mkString("[", ", ", "]")} don't belong to " +
@@ -111,12 +110,12 @@ trait PaimonPartitionManagement extends SupportsPartitionManagement {
.map {
case (partitionName, index) =>
val internalRowIndex = schema.fieldIndex(partitionName)
- val structField = schema.fields(index)
+ val structField = schema.fields(internalRowIndex)
sparkInternalRow
.get(internalRowIndex, structField.dataType)
.equals(internalRow.get(index, structField.dataType))
}
- .fold(true)(_ && _)
+ .forall(identity)
})
.toArray
}
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
index 91a77f54d..38bef009a 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
@@ -82,19 +82,18 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase {
} else {
""
}
- spark.sql(
- s"""
- |CREATE TABLE T (a VARCHAR(10), b CHAR(10),c BIGINT,dt VARCHAR(8),hh VARCHAR(4))
- |PARTITIONED BY (dt, hh)
- |TBLPROPERTIES ($primaryKeysProp 'bucket'='$bucket')
- |""".stripMargin)
-
- spark.sql("INSERT INTO T VALUES('a','b',1,'20230816','1132')")
- spark.sql("INSERT INTO T VALUES('a','b',1,'20230816','1133')")
- spark.sql("INSERT INTO T VALUES('a','b',1,'20230816','1134')")
- spark.sql("INSERT INTO T VALUES('a','b',2,'20230817','1132')")
- spark.sql("INSERT INTO T VALUES('a','b',2,'20230817','1133')")
- spark.sql("INSERT INTO T VALUES('a','b',2,'20230817','1134')")
+ spark.sql(s"""
+ |CREATE TABLE T (a VARCHAR(10), b CHAR(10),c BIGINT,dt LONG,hh VARCHAR(4))
+ |PARTITIONED BY (dt, hh)
+ |TBLPROPERTIES ($primaryKeysProp 'bucket'='$bucket')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES('a','b',1,20230816,'1132')")
+ spark.sql("INSERT INTO T VALUES('a','b',1,20230816,'1133')")
+ spark.sql("INSERT INTO T VALUES('a','b',1,20230816,'1134')")
+ spark.sql("INSERT INTO T VALUES('a','b',2,20230817,'1132')")
+ spark.sql("INSERT INTO T VALUES('a','b',2,20230817,'1133')")
+ spark.sql("INSERT INTO T VALUES('a','b',2,20230817,'1134')")
checkAnswer(
spark.sql("show partitions T "),
@@ -104,7 +103,7 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase {
)
checkAnswer(
- spark.sql("show partitions T PARTITION (dt='20230817', hh='1132')"),
+ spark.sql("show partitions T PARTITION (dt=20230817, hh='1132')"),
Row("dt=20230817/hh=1132") :: Nil)
checkAnswer(
@@ -112,20 +111,20 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase {
Row("dt=20230816/hh=1132") :: Row("dt=20230817/hh=1132") :: Nil)
checkAnswer(
- spark.sql("show partitions T PARTITION (dt='20230816')"),
+ spark.sql("show partitions T PARTITION (dt=20230816)"),
Row("dt=20230816/hh=1132") :: Row("dt=20230816/hh=1133") :: Row(
"dt=20230816/hh=1134") :: Nil)
checkAnswer(spark.sql("show partitions T PARTITION (hh='1135')"), Nil)
- checkAnswer(spark.sql("show partitions T PARTITION (dt='20230818')"), Nil)
+ checkAnswer(spark.sql("show partitions T PARTITION (dt=20230818)"), Nil)
- spark.sql("alter table T drop partition (dt='20230816', hh='1134')")
+ spark.sql("alter table T drop partition (dt=20230816, hh='1134')")
- spark.sql("alter table T drop partition (dt='20230817', hh='1133')")
+ spark.sql("alter table T drop partition (dt=20230817, hh='1133')")
assertThrows[AnalysisException] {
- spark.sql("alter table T drop partition (dt='20230816')")
+ spark.sql("alter table T drop partition (dt=20230816)")
}
assertThrows[AnalysisException] {
@@ -138,7 +137,7 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase {
:: Row("dt=20230817/hh=1132") :: Row("dt=20230817/hh=1134") :: Nil)
checkAnswer(
- spark.sql("show partitions T PARTITION (dt='20230817', hh='1132')"),
+ spark.sql("show partitions T PARTITION (dt=20230817, hh='1132')"),
Row("dt=20230817/hh=1132") :: Nil)
checkAnswer(
@@ -150,17 +149,17 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase {
Row("dt=20230817/hh=1134") :: Nil)
checkAnswer(
- spark.sql("show partitions T PARTITION (dt='20230817')"),
+ spark.sql("show partitions T PARTITION (dt=20230817)"),
Row("dt=20230817/hh=1132") :: Row("dt=20230817/hh=1134") :: Nil)
checkAnswer(
spark.sql("select * from T"),
- Row("a", "b", 1L, "20230816", "1132") :: Row("a", "b", 1L, "20230816", "1133") :: Row(
+ Row("a", "b", 1L, 20230816L, "1132") :: Row("a", "b", 1L, 20230816L, "1133") :: Row(
"a",
"b",
2L,
- "20230817",
- "1132") :: Row("a", "b", 2L, "20230817", "1134") :: Nil
+ 20230817L,
+ "1132") :: Row("a", "b", 2L, 20230817L, "1134") :: Nil
)
}
}