You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by GitBox <gi...@apache.org> on 2019/01/14 14:02:44 UTC
[spark] Diff for: [GitHub] cloud-fan closed pull request #23391:
[SPARK-26456][SQL] Cast date/timestamp to string by Date/TimestampFormatter
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index ee463bf5eb6ac..ff6a68b290206 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -230,12 +230,15 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
// [[func]] assumes the input is no longer null because eval already does the null check.
@inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T])
+ private lazy val dateFormatter = DateFormatter()
+ private lazy val timestampFormatter = TimestampFormatter(timeZone)
+
// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
- case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.dateToString(d)))
+ case DateType => buildCast[Int](_, d => UTF8String.fromString(dateFormatter.format(d)))
case TimestampType => buildCast[Long](_,
- t => UTF8String.fromString(DateTimeUtils.timestampToString(t, timeZone)))
+ t => UTF8String.fromString(DateTimeUtils.timestampToString(timestampFormatter, t)))
case ArrayType(et, _) =>
buildCast[ArrayData](_, array => {
val builder = new UTF8StringBuilder
@@ -843,12 +846,16 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
case BinaryType =>
(c, evPrim, evNull) => code"$evPrim = UTF8String.fromBytes($c);"
case DateType =>
- (c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
- org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c));"""
+ val df = JavaCode.global(
+ ctx.addReferenceObj("dateFormatter", dateFormatter),
+ dateFormatter.getClass)
+ (c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(${df}.format($c));"""
case TimestampType =>
- val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass)
+ val tf = JavaCode.global(
+ ctx.addReferenceObj("timestampFormatter", timestampFormatter),
+ timestampFormatter.getClass)
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
- org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));"""
+ org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($tf, $c));"""
case ArrayType(et, _) =>
(c, evPrim, evNull) => {
val buffer = ctx.freshVariable("buffer", classOf[UTF8StringBuilder])
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 8fc0112c02577..e173f8091f869 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -562,7 +562,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
copy(timeZoneId = Option(timeZoneId))
override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
- val df = TimestampFormatter(format.toString, timeZone, Locale.US)
+ val df = TimestampFormatter(format.toString, timeZone)
UTF8String.fromString(df.format(timestamp.asInstanceOf[Long]))
}
@@ -667,7 +667,7 @@ abstract class UnixTime
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
- TimestampFormatter(constFormat.toString, timeZone, Locale.US)
+ TimestampFormatter(constFormat.toString, timeZone)
} catch {
case NonFatal(_) => null
}
@@ -700,7 +700,7 @@ abstract class UnixTime
} else {
val formatString = f.asInstanceOf[UTF8String].toString
try {
- TimestampFormatter(formatString, timeZone, Locale.US).parse(
+ TimestampFormatter(formatString, timeZone).parse(
t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND
} catch {
case NonFatal(_) => null
@@ -821,7 +821,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
- TimestampFormatter(constFormat.toString, timeZone, Locale.US)
+ TimestampFormatter(constFormat.toString, timeZone)
} catch {
case NonFatal(_) => null
}
@@ -847,7 +847,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
null
} else {
try {
- UTF8String.fromString(TimestampFormatter(f.toString, timeZone, Locale.US)
+ UTF8String.fromString(TimestampFormatter(f.toString, timeZone)
.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
} catch {
case NonFatal(_) => null
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
index c47b08729c4e1..adc69ab1c652e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
@@ -51,7 +51,14 @@ class Iso8601DateFormatter(
}
object DateFormatter {
+ val defaultPattern: String = "yyyy-MM-dd"
+ val defaultLocale: Locale = Locale.US
+
def apply(format: String, locale: Locale): DateFormatter = {
new Iso8601DateFormatter(format, locale)
}
+
+ def apply(format: String): DateFormatter = apply(format, defaultLocale)
+
+ def apply(): DateFormatter = apply(defaultPattern)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index e95117f95cdb8..da8899a02f319 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -92,32 +92,6 @@ object DateTimeUtils {
}
}
- // `SimpleDateFormat` is not thread-safe.
- private val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
- override def initialValue(): SimpleDateFormat = {
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
- }
- }
-
- def getThreadLocalTimestampFormat(timeZone: TimeZone): DateFormat = {
- val sdf = threadLocalTimestampFormat.get()
- sdf.setTimeZone(timeZone)
- sdf
- }
-
- // `SimpleDateFormat` is not thread-safe.
- private val threadLocalDateFormat = new ThreadLocal[DateFormat] {
- override def initialValue(): SimpleDateFormat = {
- new SimpleDateFormat("yyyy-MM-dd", Locale.US)
- }
- }
-
- def getThreadLocalDateFormat(timeZone: TimeZone): DateFormat = {
- val sdf = threadLocalDateFormat.get()
- sdf.setTimeZone(timeZone)
- sdf
- }
-
private val computedTimeZones = new ConcurrentHashMap[String, TimeZone]
private val computeTimeZone = new JFunction[String, TimeZone] {
override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId)
@@ -149,24 +123,11 @@ object DateTimeUtils {
millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone)
}
- def dateToString(days: SQLDate): String =
- getThreadLocalDateFormat(defaultTimeZone()).format(toJavaDate(days))
-
- def dateToString(days: SQLDate, timeZone: TimeZone): String = {
- getThreadLocalDateFormat(timeZone).format(toJavaDate(days))
- }
-
- // Converts Timestamp to string according to Hive TimestampWritable convention.
- def timestampToString(us: SQLTimestamp): String = {
- timestampToString(us, defaultTimeZone())
- }
-
// Converts Timestamp to string according to Hive TimestampWritable convention.
- def timestampToString(us: SQLTimestamp, timeZone: TimeZone): String = {
+ def timestampToString(tf: TimestampFormatter, us: SQLTimestamp): String = {
val ts = toJavaTimestamp(us)
val timestampString = ts.toString
- val timestampFormat = getThreadLocalTimestampFormat(timeZone)
- val formatted = timestampFormat.format(ts)
+ val formatted = tf.format(us)
if (timestampString.length > 19 && timestampString.substring(19) != ".0") {
formatted + timestampString.substring(19)
@@ -1168,7 +1129,5 @@ object DateTimeUtils {
*/
private[util] def resetThreadLocals(): Unit = {
threadLocalGmtCalendar.remove()
- threadLocalTimestampFormat.remove()
- threadLocalDateFormat.remove()
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 10c73b2f99558..1374a825ec6dd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -36,7 +36,7 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[ParseException])
@throws(classOf[DateTimeParseException])
@throws(classOf[DateTimeException])
- def parse(s: String): Long // returns microseconds since epoch
+ def parse(s: String): Long
def format(us: Long): String
}
@@ -74,7 +74,18 @@ class Iso8601TimestampFormatter(
}
object TimestampFormatter {
+ val defaultPattern: String = "yyyy-MM-dd HH:mm:ss"
+ val defaultLocale: Locale = Locale.US
+
def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
new Iso8601TimestampFormatter(format, timeZone, locale)
}
+
+ def apply(format: String, timeZone: TimeZone): TimestampFormatter = {
+ apply(format, timeZone, defaultLocale)
+ }
+
+ def apply(timeZone: TimeZone): TimestampFormatter = {
+ apply(defaultPattern, timeZone, defaultLocale)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 2cb6110e2c093..e732eb0ef9816 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -35,10 +35,11 @@ class DateTimeUtilsSuite extends SparkFunSuite {
}
test("nanoseconds truncation") {
+ val tf = TimestampFormatter(DateTimeUtils.defaultTimeZone())
def checkStringToTimestamp(originalTime: String, expectedParsedTime: String) {
val parsedTimestampOp = DateTimeUtils.stringToTimestamp(UTF8String.fromString(originalTime))
assert(parsedTimestampOp.isDefined, "timestamp with nanoseconds was not parsed correctly")
- assert(DateTimeUtils.timestampToString(parsedTimestampOp.get) === expectedParsedTime)
+ assert(DateTimeUtils.timestampToString(tf, parsedTimestampOp.get) === expectedParsedTime)
}
checkStringToTimestamp("2015-01-02 00:00:00.123456789", "2015-01-02 00:00:00.123456")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala
index 2dc55e0e1f633..4d5872c92f5a7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala
@@ -29,7 +29,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
test("parsing dates") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
- val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+ val formatter = DateFormatter()
val daysSinceEpoch = formatter.parse("2018-12-02")
assert(daysSinceEpoch === 17867)
}
@@ -39,7 +39,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
test("format dates") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
- val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+ val formatter = DateFormatter()
val date = formatter.format(17867)
assert(date === "2018-12-02")
}
@@ -59,7 +59,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
"5010-11-17").foreach { date =>
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
- val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+ val formatter = DateFormatter()
val days = formatter.parse(date)
val formatted = formatter.format(days)
assert(date === formatted)
@@ -82,7 +82,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
1110657).foreach { days =>
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
- val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+ val formatter = DateFormatter()
val date = formatter.format(days)
val parsed = formatter.parse(date)
assert(days === parsed)
@@ -92,7 +92,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
}
test("parsing date without explicit day") {
- val formatter = DateFormatter("yyyy MMM", Locale.US)
+ val formatter = DateFormatter("yyyy MMM")
val daysSinceEpoch = formatter.parse("2018 Dec")
assert(daysSinceEpoch === LocalDate.of(2018, 12, 1).toEpochDay)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
index 2ce3eacc30cc0..d007adf3aab86 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
@@ -41,8 +41,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = TimestampFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
- TimeZone.getTimeZone(timeZone),
- Locale.US)
+ TimeZone.getTimeZone(timeZone))
val microsSinceEpoch = formatter.parse(localDate)
assert(microsSinceEpoch === expectedMicros(timeZone))
}
@@ -62,8 +61,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = TimestampFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
- TimeZone.getTimeZone(timeZone),
- Locale.US)
+ TimeZone.getTimeZone(timeZone))
val timestamp = formatter.format(microsSinceEpoch)
assert(timestamp === expectedTimestamp(timeZone))
}
@@ -82,7 +80,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
2177456523456789L,
11858049903010203L).foreach { micros =>
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
- val formatter = TimestampFormatter(pattern, timeZone, Locale.US)
+ val formatter = TimestampFormatter(pattern, timeZone)
val timestamp = formatter.format(micros)
val parsed = formatter.parse(timestamp)
assert(micros === parsed)
@@ -103,7 +101,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
"2039-01-01T01:02:03.456789",
"2345-10-07T22:45:03.010203").foreach { timestamp =>
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
- val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
+ val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone)
val micros = formatter.parse(timestamp)
val formatted = formatter.format(micros)
assert(timestamp === formatted)
@@ -114,8 +112,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
test(" case insensitive parsing of am and pm") {
val formatter = TimestampFormatter(
"yyyy MMM dd hh:mm:ss a",
- TimeZone.getTimeZone("UTC"),
- Locale.US)
+ TimeZone.getTimeZone("UTC"))
val micros = formatter.parse("2009 Mar 20 11:30:01 am")
assert(micros === TimeUnit.SECONDS.toMicros(
LocalDateTime.of(2009, 3, 20, 11, 30, 1).toEpochSecond(ZoneOffset.UTC)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
index c90b254a6d121..d3934a0e52de5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -77,6 +77,10 @@ object HiveResult {
TimestampType,
BinaryType)
+ private lazy val dateFormatter = DateFormatter()
+ private lazy val timestampFormatter = TimestampFormatter(
+ DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone))
+
/** Hive outputs fields of structs slightly differently than top level attributes. */
private def toHiveStructString(a: (Any, DataType)): String = a match {
case (struct: Row, StructType(fields)) =>
@@ -111,11 +115,9 @@ object HiveResult {
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
}.toSeq.sorted.mkString("{", ",", "}")
case (null, _) => "NULL"
- case (d: Date, DateType) =>
- DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
+ case (d: Date, DateType) => dateFormatter.format(DateTimeUtils.fromJavaDate(d))
case (t: Timestamp, TimestampType) =>
- val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)
- DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t), timeZone)
+ DateTimeUtils.timestampToString(timestampFormatter, DateTimeUtils.fromJavaTimestamp(t))
case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
case (interval, CalendarIntervalType) => interval.toString
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 6458b65466fb5..ee770426e61f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -26,13 +26,12 @@ import scala.util.Try
import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
@@ -59,6 +58,8 @@ object PartitionSpec {
object PartitioningUtils {
+ val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
+
private[datasources] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal])
{
require(columnNames.size == literals.size)
@@ -122,10 +123,12 @@ object PartitioningUtils {
Map.empty[String, DataType]
}
+ val dateFormatter = DateFormatter()
+ val timestampFormatter = TimestampFormatter(timestampPartitionPattern, timeZone)
// First, we need to parse every partition's path and see if we can find partition values.
val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes,
- validatePartitionColumns, timeZone)
+ validatePartitionColumns, timeZone, dateFormatter, timestampFormatter)
}.unzip
// We create pairs of (path -> path's partition value) here
@@ -208,7 +211,9 @@ object PartitioningUtils {
basePaths: Set[Path],
userSpecifiedDataTypes: Map[String, DataType],
validatePartitionColumns: Boolean,
- timeZone: TimeZone): (Option[PartitionValues], Option[Path]) = {
+ timeZone: TimeZone,
+ dateFormatter: DateFormatter,
+ timestampFormatter: TimestampFormatter): (Option[PartitionValues], Option[Path]) = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
@@ -230,7 +235,7 @@ object PartitioningUtils {
// Once we get the string, we try to parse it and find the partition column and value.
val maybeColumn =
parsePartitionColumn(currentPath.getName, typeInference, userSpecifiedDataTypes,
- validatePartitionColumns, timeZone)
+ validatePartitionColumns, timeZone, dateFormatter, timestampFormatter)
maybeColumn.foreach(columns += _)
// Now, we determine if we should stop.
@@ -265,7 +270,9 @@ object PartitioningUtils {
typeInference: Boolean,
userSpecifiedDataTypes: Map[String, DataType],
validatePartitionColumns: Boolean,
- timeZone: TimeZone): Option[(String, Literal)] = {
+ timeZone: TimeZone,
+ dateFormatter: DateFormatter,
+ timestampFormatter: TimestampFormatter): Option[(String, Literal)] = {
val equalSignIndex = columnSpec.indexOf('=')
if (equalSignIndex == -1) {
None
@@ -280,7 +287,12 @@ object PartitioningUtils {
// SPARK-26188: if user provides corresponding column schema, get the column value without
// inference, and then cast it as user specified data type.
val dataType = userSpecifiedDataTypes(columnName)
- val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, false, timeZone)
+ val columnValueLiteral = inferPartitionColumnValue(
+ rawColumnValue,
+ false,
+ timeZone,
+ dateFormatter,
+ timestampFormatter)
val columnValue = columnValueLiteral.eval()
val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval()
if (validatePartitionColumns && columnValue != null && castedValue == null) {
@@ -289,7 +301,12 @@ object PartitioningUtils {
}
Literal.create(castedValue, dataType)
} else {
- inferPartitionColumnValue(rawColumnValue, typeInference, timeZone)
+ inferPartitionColumnValue(
+ rawColumnValue,
+ typeInference,
+ timeZone,
+ dateFormatter,
+ timestampFormatter)
}
Some(columnName -> literal)
}
@@ -442,7 +459,9 @@ object PartitioningUtils {
private[datasources] def inferPartitionColumnValue(
raw: String,
typeInference: Boolean,
- timeZone: TimeZone): Literal = {
+ timeZone: TimeZone,
+ dateFormatter: DateFormatter,
+ timestampFormatter: TimestampFormatter): Literal = {
val decimalTry = Try {
// `BigDecimal` conversion can fail when the `field` is not a form of number.
val bigDecimal = new JBigDecimal(raw)
@@ -457,7 +476,7 @@ object PartitioningUtils {
val dateTry = Try {
// try and parse the date, if no exception occurs this is a candidate to be resolved as
// DateType
- DateTimeUtils.getThreadLocalDateFormat(DateTimeUtils.defaultTimeZone()).parse(raw)
+ dateFormatter.parse(raw)
// SPARK-23436: Casting the string to date may still return null if a bad Date is provided.
// This can happen since DateFormat.parse may not use the entire text of the given string:
// so if there are extra-characters after the date, it returns correctly.
@@ -474,7 +493,7 @@ object PartitioningUtils {
val unescapedRaw = unescapePathName(raw)
// try and parse the date, if no exception occurs this is a candidate to be resolved as
// TimestampType
- DateTimeUtils.getThreadLocalTimestampFormat(timeZone).parse(unescapedRaw)
+ timestampFormatter.parse(unescapedRaw)
// SPARK-23436: see comment for date
val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(timeZone.getID)).eval()
// Disallow TimestampType if the cast returned null
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 51c385e25bee3..13ed105004d70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
@@ -185,10 +185,11 @@ private[sql] object JDBCRelation extends Logging {
columnType: DataType,
timeZoneId: String): String = {
def dateTimeToString(): String = {
- val timeZone = DateTimeUtils.getTimeZone(timeZoneId)
val dateTimeStr = columnType match {
- case DateType => DateTimeUtils.dateToString(value.toInt, timeZone)
- case TimestampType => DateTimeUtils.timestampToString(value, timeZone)
+ case DateType => DateFormatter().format(value.toInt)
+ case TimestampType =>
+ val timestampFormatter = TimestampFormatter(DateTimeUtils.getTimeZone(timeZoneId))
+ DateTimeUtils.timestampToString(timestampFormatter, value)
}
s"'$dateTimeStr'"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
index 4205b3f79a972..bbce4705871df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
@@ -17,10 +17,27 @@
package org.apache.spark.sql.execution
+import java.sql.{Date, Timestamp}
+
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT}
+import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext}
+
+class HiveResultSuite extends SparkFunSuite with SharedSQLContext {
+ import testImplicits._
-class HiveResultSuite extends SparkFunSuite {
+ test("date formatting in hive result") {
+ val date = "2018-12-28"
+ val executedPlan = Seq(Date.valueOf(date)).toDS().queryExecution.executedPlan
+ val result = HiveResult.hiveResultString(executedPlan)
+ assert(result.head == date)
+ }
+
+ test("timestamp formatting in hive result") {
+ val timestamp = "2018-12-28 01:02:03"
+ val executedPlan = Seq(Timestamp.valueOf(timestamp)).toDS().queryExecution.executedPlan
+ val result = HiveResult.hiveResultString(executedPlan)
+ assert(result.head == timestamp)
+ }
test("toHiveString correctly handles UDTs") {
val point = new ExamplePoint(50.0, 50.0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 88067358667c6..864c1e99fbfb2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
import org.apache.spark.sql.execution.streaming.MemoryStream
@@ -56,6 +56,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
val timeZone = TimeZone.getDefault()
val timeZoneId = timeZone.getID
+ val df = DateFormatter()
+ val tf = TimestampFormatter(timestampPartitionPattern, timeZone)
protected override def beforeAll(): Unit = {
super.beforeAll()
@@ -69,7 +71,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
test("column type inference") {
def check(raw: String, literal: Literal, timeZone: TimeZone = timeZone): Unit = {
- assert(inferPartitionColumnValue(raw, true, timeZone) === literal)
+ assert(inferPartitionColumnValue(raw, true, timeZone, df, tf) === literal)
}
check("10", Literal.create(10, IntegerType))
@@ -197,13 +199,13 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = {
val actual = parsePartition(new Path(path), true, Set.empty[Path],
- Map.empty, true, timeZone)._1
+ Map.empty, true, timeZone, df, tf)._1
assert(expected === actual)
}
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] {
- parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true, timeZone)
+ parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true, timeZone, df, tf)
}.getMessage
assert(message.contains(expected))
@@ -249,7 +251,9 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
basePaths = Set(new Path("file://path/a=10")),
Map.empty,
true,
- timeZone = timeZone)._1
+ timeZone = timeZone,
+ df,
+ tf)._1
assert(partitionSpec1.isEmpty)
@@ -260,7 +264,9 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
basePaths = Set(new Path("file://path")),
Map.empty,
true,
- timeZone = timeZone)._1
+ timeZone = timeZone,
+ df,
+ tf)._1
assert(partitionSpec2 ==
Option(PartitionValues(
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org