You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/03/10 17:41:44 UTC

[spark] branch branch-3.4 updated: [SPARK-42743][SQL] Support analyze TimestampNTZ columns

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 24f3d4d1791 [SPARK-42743][SQL] Support analyze TimestampNTZ columns
24f3d4d1791 is described below

commit 24f3d4d17913ec90a48ecf9dd23b4db7c19d10c2
Author: Gengliang Wang <ge...@apache.org>
AuthorDate: Fri Mar 10 14:33:45 2023 +0300

    [SPARK-42743][SQL] Support analyze TimestampNTZ columns
    
    ### What changes were proposed in this pull request?
    
    Support analyze TimestampNTZ columns
    ```
    ANALYZE TABLE table_name [ PARTITION clause ]
        COMPUTE STATISTICS [ NOSCAN | FOR COLUMNS col1 [, ...] | FOR ALL COLUMNS ]
    ```
    
    ### Why are the changes needed?
    
    Support computing statistics of TimestmapNTZ columns, which can be used for optimizations.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the TimestampNTZ type is not released yet.
    
    ### How was this patch tested?
    
    Update existing UT
    
    Closes #40362 from gengliangwang/analyzeColumn.
    
    Authored-by: Gengliang Wang <ge...@apache.org>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../spark/sql/catalyst/catalog/interface.scala     |  9 +++++--
 .../sql/catalyst/util/TimestampFormatter.scala     |  8 ++++++
 .../execution/command/AnalyzeColumnCommand.scala   |  5 ++--
 .../spark/sql/execution/command/CommandUtils.scala |  6 ++---
 .../spark/sql/StatisticsCollectionTestBase.scala   | 29 +++++++++++++++++-----
 .../apache/spark/sql/hive/StatisticsSuite.scala    |  3 ++-
 6 files changed, 44 insertions(+), 16 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 829c121c583..6f4c4f27efc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -661,11 +661,13 @@ object CatalogColumnStat extends Logging {
   def getTimestampFormatter(
       isParsing: Boolean,
       format: String = "yyyy-MM-dd HH:mm:ss.SSSSSS",
-      zoneId: ZoneId = ZoneOffset.UTC): TimestampFormatter = {
+      zoneId: ZoneId = ZoneOffset.UTC,
+      forTimestampNTZ: Boolean = false): TimestampFormatter = {
     TimestampFormatter(
       format = format,
       zoneId = zoneId,
-      isParsing = isParsing)
+      isParsing = isParsing,
+      forTimestampNTZ = forTimestampNTZ)
   }
 
   /**
@@ -702,6 +704,9 @@ object CatalogColumnStat extends Logging {
     val externalValue = dataType match {
       case DateType => DateFormatter().format(v.asInstanceOf[Int])
       case TimestampType => getTimestampFormatter(isParsing = false).format(v.asInstanceOf[Long])
+      case TimestampNTZType =>
+        getTimestampFormatter(isParsing = false, forTimestampNTZ = true)
+          .format(v.asInstanceOf[Long])
       case BooleanType | _: IntegralType | FloatType | DoubleType => v
       case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
       // This version of Spark does not use min/max for binary/string types so we ignore it.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 8ebe77978b5..392e8ebdc6c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -525,6 +525,14 @@ object TimestampFormatter {
     getFormatter(Some(format), zoneId, isParsing = isParsing)
   }
 
+  def apply(
+      format: String,
+      zoneId: ZoneId,
+      isParsing: Boolean,
+      forTimestampNTZ: Boolean): TimestampFormatter = {
+    getFormatter(Some(format), zoneId, isParsing = isParsing, forTimestampNTZ = forTimestampNTZ)
+  }
+
   def apply(zoneId: ZoneId): TimestampFormatter = {
     getFormatter(None, zoneId, isParsing = false)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index d821b127e06..299f41eb55e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableTyp
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{DatetimeType, _}
 
 
 /**
@@ -139,8 +139,7 @@ case class AnalyzeColumnCommand(
     case _: DecimalType => true
     case DoubleType | FloatType => true
     case BooleanType => true
-    case DateType => true
-    case TimestampType => true
+    case _: DatetimeType => true
     case BinaryType | StringType => true
     case _ => false
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index d847868c0ce..c656bdbafa0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -329,8 +329,7 @@ object CommandUtils extends Logging {
     case _: IntegralType => true
     case _: DecimalType => true
     case DoubleType | FloatType => true
-    case DateType => true
-    case TimestampType => true
+    case _: DatetimeType => true
     case _ => false
   }
 
@@ -379,8 +378,7 @@ object CommandUtils extends Logging {
       case _: DecimalType => fixedLenTypeStruct
       case DoubleType | FloatType => fixedLenTypeStruct
       case BooleanType => fixedLenTypeStruct
-      case DateType => fixedLenTypeStruct
-      case TimestampType => fixedLenTypeStruct
+      case _: DatetimeType => fixedLenTypeStruct
       case BinaryType | StringType =>
         // For string and binary type, we don't compute min, max or histogram
         val nullLit = Literal(null, col.dataType)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 6c6ef1a118f..04e47ac4a11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import java.{lang => jl}
 import java.io.File
 import java.sql.{Date, Timestamp}
+import java.time.LocalDateTime
 
 import scala.collection.mutable
 import scala.util.Random
@@ -59,10 +60,12 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
   private val t1Internal = date(2016, 5, 8, 0, 0, 1, 123456)
   private val t1 = new Timestamp(DateTimeUtils.microsToMillis(t1Internal))
   t1.setNanos(123456000)
+  private val tsNTZ1 = LocalDateTime.parse(t1Str.replace(" ", "T"))
   private val t2Str = "2016-05-09 00:00:02.987654"
   private val t2Internal = date(2016, 5, 9, 0, 0, 2, 987654)
   private val t2 = new Timestamp(DateTimeUtils.microsToMillis(t2Internal))
   t2.setNanos(987654000)
+  private val tsNTZ2 = LocalDateTime.parse(t2Str.replace(" ", "T"))
 
   private val double1 = 1.123456789
   private val double2 = 6.987654321
@@ -74,14 +77,14 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
   protected val data = Seq[
     (jl.Boolean, jl.Byte, jl.Short, jl.Integer, jl.Long,
       jl.Double, jl.Float, java.math.BigDecimal,
-      String, Array[Byte], Date, Timestamp,
+      String, Array[Byte], Date, Timestamp, LocalDateTime,
       Seq[Int])](
     // scalastyle:off nonascii
     (false, 1.toByte, 1.toShort, 1, 1L, double1, 1.12345f,
-      dec1, "string escrito en español", "b1".getBytes, d1, t1, null),
+      dec1, "string escrito en español", "b1".getBytes, d1, t1, tsNTZ1, null),
     (true, 2.toByte, 30000.toShort, 40000000, 5536453629L, double2, 7.54321f,
-      dec2, "日本語で書かれたstring", "a string full of bytes".getBytes, d2, t2, null),
-    (null, null, null, null, null, null, null, null, null, null, null, null, null)
+      dec2, "日本語で書かれたstring", "a string full of bytes".getBytes, d2, t2, tsNTZ2, null),
+    (null, null, null, null, null, null, null, null, null, null, null, null, null, null)
     // scalastyle:on nonascii
   )
 
@@ -103,6 +106,8 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
     "cdate" -> CatalogColumnStat(Some(2), Some(d1Str), Some(d2Str),
       Some(1), Some(4), Some(4)),
     "ctimestamp" -> CatalogColumnStat(Some(2), Some(t1Str),
+      Some(t2Str), Some(1), Some(8), Some(8)),
+    "ctimestamp_ntz" -> CatalogColumnStat(Some(2), Some(t1Str),
       Some(t2Str), Some(1), Some(8), Some(8))
   )
 
@@ -136,6 +141,9 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
     colStats.update("ctimestamp", stats("ctimestamp").copy(histogram =
       Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1),
         HistogramBin(t1Internal, t2Internal, 1))))))
+    colStats.update("ctimestamp_ntz", stats("ctimestamp_ntz").copy(histogram =
+      Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1),
+        HistogramBin(t1Internal, t2Internal, 1))))))
     colStats
   }
 
@@ -220,7 +228,14 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
     "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8",
     "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.123456",
     "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1",
-    "spark.sql.statistics.colStats.ctimestamp.version" -> strVersion
+    "spark.sql.statistics.colStats.ctimestamp.version" -> strVersion,
+    "spark.sql.statistics.colStats.ctimestamp_ntz.avgLen" -> "8",
+    "spark.sql.statistics.colStats.ctimestamp_ntz.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.ctimestamp_ntz.max" -> "2016-05-09 00:00:02.987654",
+    "spark.sql.statistics.colStats.ctimestamp_ntz.maxLen" -> "8",
+    "spark.sql.statistics.colStats.ctimestamp_ntz.min" -> "2016-05-08 00:00:01.123456",
+    "spark.sql.statistics.colStats.ctimestamp_ntz.nullCount" -> "1",
+    "spark.sql.statistics.colStats.ctimestamp_ntz.version" -> strVersion
   )
 
   val expectedSerializedHistograms = Map(
@@ -241,7 +256,9 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
     "spark.sql.statistics.colStats.cdate.histogram" ->
       HistogramSerializer.serialize(statsWithHgms("cdate").histogram.get),
     "spark.sql.statistics.colStats.ctimestamp.histogram" ->
-      HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get)
+      HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get),
+    "spark.sql.statistics.colStats.ctimestamp_ntz.histogram" ->
+      HistogramSerializer.serialize(statsWithHgms("ctimestamp_ntz").histogram.get)
   )
 
   private val randomName = new Random(31)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 65fd2f72727..507c482525c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -1154,7 +1154,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     val tableName = "column_stats_test_de"
     // (data.head.productArity - 1) because the last column does not support stats collection.
     assert(stats.size == data.head.productArity - 1)
-    val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+    // Hive can't parse data type "timestamp_ntz"
+    val df = data.toDF(stats.keys.toSeq :+ "carray" : _*).drop("ctimestamp_ntz")
 
     withTable(tableName) {
       df.write.saveAsTable(tableName)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org