You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/03/10 17:41:44 UTC
[spark] branch branch-3.4 updated: [SPARK-42743][SQL] Support analyze TimestampNTZ columns
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 24f3d4d1791 [SPARK-42743][SQL] Support analyze TimestampNTZ columns
24f3d4d1791 is described below
commit 24f3d4d17913ec90a48ecf9dd23b4db7c19d10c2
Author: Gengliang Wang <ge...@apache.org>
AuthorDate: Fri Mar 10 14:33:45 2023 +0300
[SPARK-42743][SQL] Support analyze TimestampNTZ columns
### What changes were proposed in this pull request?
Support analyze TimestampNTZ columns
```
ANALYZE TABLE table_name [ PARTITION clause ]
COMPUTE STATISTICS [ NOSCAN | FOR COLUMNS col1 [, ...] | FOR ALL COLUMNS ]
```
### Why are the changes needed?
Support computing statistics of TimestmapNTZ columns, which can be used for optimizations.
### Does this PR introduce _any_ user-facing change?
No, the TimestampNTZ type is not released yet.
### How was this patch tested?
Update existing UT
Closes #40362 from gengliangwang/analyzeColumn.
Authored-by: Gengliang Wang <ge...@apache.org>
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../spark/sql/catalyst/catalog/interface.scala | 9 +++++--
.../sql/catalyst/util/TimestampFormatter.scala | 8 ++++++
.../execution/command/AnalyzeColumnCommand.scala | 5 ++--
.../spark/sql/execution/command/CommandUtils.scala | 6 ++---
.../spark/sql/StatisticsCollectionTestBase.scala | 29 +++++++++++++++++-----
.../apache/spark/sql/hive/StatisticsSuite.scala | 3 ++-
6 files changed, 44 insertions(+), 16 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 829c121c583..6f4c4f27efc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -661,11 +661,13 @@ object CatalogColumnStat extends Logging {
def getTimestampFormatter(
isParsing: Boolean,
format: String = "yyyy-MM-dd HH:mm:ss.SSSSSS",
- zoneId: ZoneId = ZoneOffset.UTC): TimestampFormatter = {
+ zoneId: ZoneId = ZoneOffset.UTC,
+ forTimestampNTZ: Boolean = false): TimestampFormatter = {
TimestampFormatter(
format = format,
zoneId = zoneId,
- isParsing = isParsing)
+ isParsing = isParsing,
+ forTimestampNTZ = forTimestampNTZ)
}
/**
@@ -702,6 +704,9 @@ object CatalogColumnStat extends Logging {
val externalValue = dataType match {
case DateType => DateFormatter().format(v.asInstanceOf[Int])
case TimestampType => getTimestampFormatter(isParsing = false).format(v.asInstanceOf[Long])
+ case TimestampNTZType =>
+ getTimestampFormatter(isParsing = false, forTimestampNTZ = true)
+ .format(v.asInstanceOf[Long])
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 8ebe77978b5..392e8ebdc6c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -525,6 +525,14 @@ object TimestampFormatter {
getFormatter(Some(format), zoneId, isParsing = isParsing)
}
+ def apply(
+ format: String,
+ zoneId: ZoneId,
+ isParsing: Boolean,
+ forTimestampNTZ: Boolean): TimestampFormatter = {
+ getFormatter(Some(format), zoneId, isParsing = isParsing, forTimestampNTZ = forTimestampNTZ)
+ }
+
def apply(zoneId: ZoneId): TimestampFormatter = {
getFormatter(None, zoneId, isParsing = false)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index d821b127e06..299f41eb55e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableTyp
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{DatetimeType, _}
/**
@@ -139,8 +139,7 @@ case class AnalyzeColumnCommand(
case _: DecimalType => true
case DoubleType | FloatType => true
case BooleanType => true
- case DateType => true
- case TimestampType => true
+ case _: DatetimeType => true
case BinaryType | StringType => true
case _ => false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index d847868c0ce..c656bdbafa0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -329,8 +329,7 @@ object CommandUtils extends Logging {
case _: IntegralType => true
case _: DecimalType => true
case DoubleType | FloatType => true
- case DateType => true
- case TimestampType => true
+ case _: DatetimeType => true
case _ => false
}
@@ -379,8 +378,7 @@ object CommandUtils extends Logging {
case _: DecimalType => fixedLenTypeStruct
case DoubleType | FloatType => fixedLenTypeStruct
case BooleanType => fixedLenTypeStruct
- case DateType => fixedLenTypeStruct
- case TimestampType => fixedLenTypeStruct
+ case _: DatetimeType => fixedLenTypeStruct
case BinaryType | StringType =>
// For string and binary type, we don't compute min, max or histogram
val nullLit = Literal(null, col.dataType)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 6c6ef1a118f..04e47ac4a11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import java.{lang => jl}
import java.io.File
import java.sql.{Date, Timestamp}
+import java.time.LocalDateTime
import scala.collection.mutable
import scala.util.Random
@@ -59,10 +60,12 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
private val t1Internal = date(2016, 5, 8, 0, 0, 1, 123456)
private val t1 = new Timestamp(DateTimeUtils.microsToMillis(t1Internal))
t1.setNanos(123456000)
+ private val tsNTZ1 = LocalDateTime.parse(t1Str.replace(" ", "T"))
private val t2Str = "2016-05-09 00:00:02.987654"
private val t2Internal = date(2016, 5, 9, 0, 0, 2, 987654)
private val t2 = new Timestamp(DateTimeUtils.microsToMillis(t2Internal))
t2.setNanos(987654000)
+ private val tsNTZ2 = LocalDateTime.parse(t2Str.replace(" ", "T"))
private val double1 = 1.123456789
private val double2 = 6.987654321
@@ -74,14 +77,14 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
protected val data = Seq[
(jl.Boolean, jl.Byte, jl.Short, jl.Integer, jl.Long,
jl.Double, jl.Float, java.math.BigDecimal,
- String, Array[Byte], Date, Timestamp,
+ String, Array[Byte], Date, Timestamp, LocalDateTime,
Seq[Int])](
// scalastyle:off nonascii
(false, 1.toByte, 1.toShort, 1, 1L, double1, 1.12345f,
- dec1, "string escrito en español", "b1".getBytes, d1, t1, null),
+ dec1, "string escrito en español", "b1".getBytes, d1, t1, tsNTZ1, null),
(true, 2.toByte, 30000.toShort, 40000000, 5536453629L, double2, 7.54321f,
- dec2, "日本語で書かれたstring", "a string full of bytes".getBytes, d2, t2, null),
- (null, null, null, null, null, null, null, null, null, null, null, null, null)
+ dec2, "日本語で書かれたstring", "a string full of bytes".getBytes, d2, t2, tsNTZ2, null),
+ (null, null, null, null, null, null, null, null, null, null, null, null, null, null)
// scalastyle:on nonascii
)
@@ -103,6 +106,8 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
"cdate" -> CatalogColumnStat(Some(2), Some(d1Str), Some(d2Str),
Some(1), Some(4), Some(4)),
"ctimestamp" -> CatalogColumnStat(Some(2), Some(t1Str),
+ Some(t2Str), Some(1), Some(8), Some(8)),
+ "ctimestamp_ntz" -> CatalogColumnStat(Some(2), Some(t1Str),
Some(t2Str), Some(1), Some(8), Some(8))
)
@@ -136,6 +141,9 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
colStats.update("ctimestamp", stats("ctimestamp").copy(histogram =
Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1),
HistogramBin(t1Internal, t2Internal, 1))))))
+ colStats.update("ctimestamp_ntz", stats("ctimestamp_ntz").copy(histogram =
+ Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1),
+ HistogramBin(t1Internal, t2Internal, 1))))))
colStats
}
@@ -220,7 +228,14 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
"spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8",
"spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.123456",
"spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1",
- "spark.sql.statistics.colStats.ctimestamp.version" -> strVersion
+ "spark.sql.statistics.colStats.ctimestamp.version" -> strVersion,
+ "spark.sql.statistics.colStats.ctimestamp_ntz.avgLen" -> "8",
+ "spark.sql.statistics.colStats.ctimestamp_ntz.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.ctimestamp_ntz.max" -> "2016-05-09 00:00:02.987654",
+ "spark.sql.statistics.colStats.ctimestamp_ntz.maxLen" -> "8",
+ "spark.sql.statistics.colStats.ctimestamp_ntz.min" -> "2016-05-08 00:00:01.123456",
+ "spark.sql.statistics.colStats.ctimestamp_ntz.nullCount" -> "1",
+ "spark.sql.statistics.colStats.ctimestamp_ntz.version" -> strVersion
)
val expectedSerializedHistograms = Map(
@@ -241,7 +256,9 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
"spark.sql.statistics.colStats.cdate.histogram" ->
HistogramSerializer.serialize(statsWithHgms("cdate").histogram.get),
"spark.sql.statistics.colStats.ctimestamp.histogram" ->
- HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get)
+ HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get),
+ "spark.sql.statistics.colStats.ctimestamp_ntz.histogram" ->
+ HistogramSerializer.serialize(statsWithHgms("ctimestamp_ntz").histogram.get)
)
private val randomName = new Random(31)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 65fd2f72727..507c482525c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -1154,7 +1154,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val tableName = "column_stats_test_de"
// (data.head.productArity - 1) because the last column does not support stats collection.
assert(stats.size == data.head.productArity - 1)
- val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+ // Hive can't parse data type "timestamp_ntz"
+ val df = data.toDF(stats.keys.toSeq :+ "carray" : _*).drop("ctimestamp_ntz")
withTable(tableName) {
df.write.saveAsTable(tableName)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org