You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/07 18:35:38 UTC
[spark] branch branch-3.0 updated: [SPARK-30752][SQL] Fix
`to_utc_timestamp` on daylight saving day
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a9d9d83 [SPARK-30752][SQL] Fix `to_utc_timestamp` on daylight saving day
a9d9d83 is described below
commit a9d9d834c600a4b9888f8781cec04fcbef29972d
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Sat Feb 8 02:32:07 2020 +0800
[SPARK-30752][SQL] Fix `to_utc_timestamp` on daylight saving day
### What changes were proposed in this pull request?
- Rewrite the `convertTz` method of `DateTimeUtils` using Java 8 time API
- Change types of `convertTz` parameters from `TimeZone` to `ZoneId`. This allows to avoid unnecessary conversions `TimeZone` -> `ZoneId` and performance regressions as a consequence.
### Why are the changes needed?
- Fixes incorrect behavior of `to_utc_timestamp` on daylight saving day. For example:
```scala
scala> df.select(to_utc_timestamp(lit("2019-11-03T12:00:00"), "Asia/Hong_Kong").as("local UTC")).show
+-------------------+
| local UTC|
+-------------------+
|2019-11-03 03:00:00|
+-------------------+
```
but the result must be 2019-11-03 04:00:00:
<img width="1013" alt="Screen Shot 2020-02-06 at 20 09 36" src="https://user-images.githubusercontent.com/1580697/73960846-a129bb00-491c-11ea-92f5-45831cb28a62.png">
- Simplifies the code, and make it more maintainable
- Switches `convertTz` on Proleptic Gregorian calendar used by Java 8 time classes by default. That makes the function consistent to other date-time functions.
### Does this PR introduce any user-facing change?
Yes, after the changes `to_utc_timestamp` returns the correct result `2019-11-03 04:00:00`.
### How was this patch tested?
- By existing test suite `DateTimeUtilsSuite`, `DateFunctionsSuite` and `DateExpressionsSuite`.
- Added `convert time zones on a daylight saving day` to DateFunctionsSuite
Closes #27474 from MaxGekk/port-convertTz-on-Java8-api.
Authored-by: Maxim Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit a3e77773cfa03a18d31370acd9a10562ff5312bb)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../catalyst/expressions/datetimeExpressions.scala | 16 +++++-------
.../spark/sql/catalyst/util/DateTimeUtils.scala | 28 ++++-----------------
.../parquet/VectorizedColumnReader.java | 9 ++++---
.../parquet/VectorizedParquetRecordReader.java | 6 ++---
.../datasources/parquet/ParquetFileFormat.scala | 2 +-
.../datasources/parquet/ParquetReadSupport.scala | 5 ++--
.../parquet/ParquetRecordMaterializer.scala | 4 +--
.../datasources/parquet/ParquetRowConverter.scala | 9 +++----
.../v2/parquet/ParquetPartitionReaderFactory.scala | 10 ++++----
.../org/apache/spark/sql/DateFunctionsSuite.scala | 29 ++++++++++++++++++++--
.../parquet/ParquetInteroperabilitySuite.scala | 5 ++--
11 files changed, 64 insertions(+), 59 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 28f1d34..aa2bd5a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -1176,14 +1176,12 @@ case class FromUTCTimestamp(left: Expression, right: Expression)
|long ${ev.value} = 0;
""".stripMargin)
} else {
- val tzClass = classOf[TimeZone].getName
+ val tzClass = classOf[ZoneId].getName
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val escapedTz = StringEscapeUtils.escapeJava(tz.toString)
val tzTerm = ctx.addMutableState(tzClass, "tz",
- v => s"""$v = $dtu.getTimeZone("$escapedTz");""")
- val utcTerm = "tzUTC"
- ctx.addImmutableStateIfNotExists(tzClass, utcTerm,
- v => s"""$v = $dtu.getTimeZone("UTC");""")
+ v => s"""$v = $dtu.getZoneId("$escapedTz");""")
+ val utcTerm = "java.time.ZoneOffset.UTC"
val eval = left.genCode(ctx)
ev.copy(code = code"""
|${eval.code}
@@ -1382,14 +1380,12 @@ case class ToUTCTimestamp(left: Expression, right: Expression)
|long ${ev.value} = 0;
""".stripMargin)
} else {
- val tzClass = classOf[TimeZone].getName
+ val tzClass = classOf[ZoneId].getName
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val escapedTz = StringEscapeUtils.escapeJava(tz.toString)
val tzTerm = ctx.addMutableState(tzClass, "tz",
- v => s"""$v = $dtu.getTimeZone("$escapedTz");""")
- val utcTerm = "tzUTC"
- ctx.addImmutableStateIfNotExists(tzClass, utcTerm,
- v => s"""$v = $dtu.getTimeZone("UTC");""")
+ v => s"""$v = $dtu.getZoneId("$escapedTz");""")
+ val utcTerm = "java.time.ZoneOffset.UTC"
val eval = left.genCode(ctx)
ev.copy(code = code"""
|${eval.code}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 6800abb..8eb56094 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -801,27 +801,9 @@ object DateTimeUtils {
* mapping, the conversion here may return wrong result, we should make the timestamp
* timezone-aware.
*/
- def convertTz(ts: SQLTimestamp, fromZone: TimeZone, toZone: TimeZone): SQLTimestamp = {
- // We always use local timezone to parse or format a timestamp
- val localZone = defaultTimeZone()
- val utcTs = if (fromZone.getID == localZone.getID) {
- ts
- } else {
- // get the human time using local time zone, that actually is in fromZone.
- val localZoneOffsetMs = localZone.getOffset(MICROSECONDS.toMillis(ts))
- val localTsUs = ts + MILLISECONDS.toMicros(localZoneOffsetMs) // in fromZone
- val offsetFromLocalMs = getOffsetFromLocalMillis(MICROSECONDS.toMillis(localTsUs), fromZone)
- localTsUs - MILLISECONDS.toMicros(offsetFromLocalMs)
- }
- if (toZone.getID == localZone.getID) {
- utcTs
- } else {
- val toZoneOffsetMs = toZone.getOffset(MICROSECONDS.toMillis(utcTs))
- val localTsUs = utcTs + MILLISECONDS.toMicros(toZoneOffsetMs) // in toZone
- // treat it as local timezone, convert to UTC (we could get the expected human time back)
- val offsetFromLocalMs = getOffsetFromLocalMillis(MICROSECONDS.toMillis(localTsUs), localZone)
- localTsUs - MILLISECONDS.toMicros(offsetFromLocalMs)
- }
+ def convertTz(ts: SQLTimestamp, fromZone: ZoneId, toZone: ZoneId): SQLTimestamp = {
+ val rebasedDateTime = microsToInstant(ts).atZone(toZone).toLocalDateTime.atZone(fromZone)
+ instantToMicros(rebasedDateTime.toInstant)
}
/**
@@ -829,7 +811,7 @@ object DateTimeUtils {
* representation in their timezone.
*/
def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
- convertTz(time, TimeZoneGMT, getTimeZone(timeZone))
+ convertTz(time, ZoneOffset.UTC, getZoneId(timeZone))
}
/**
@@ -837,7 +819,7 @@ object DateTimeUtils {
* string representation in their timezone.
*/
def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
- convertTz(time, getTimeZone(timeZone), TimeZoneGMT)
+ convertTz(time, getZoneId(timeZone), ZoneOffset.UTC)
}
/**
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index ba26b57..3294655 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -18,8 +18,9 @@
package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
import java.util.Arrays;
-import java.util.TimeZone;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
@@ -98,14 +99,14 @@ public class VectorizedColumnReader {
private final ColumnDescriptor descriptor;
private final OriginalType originalType;
// The timezone conversion to apply to int96 timestamps. Null if no conversion.
- private final TimeZone convertTz;
- private static final TimeZone UTC = DateTimeUtils.TimeZoneUTC();
+ private final ZoneId convertTz;
+ private static final ZoneId UTC = ZoneOffset.UTC;
public VectorizedColumnReader(
ColumnDescriptor descriptor,
OriginalType originalType,
PageReader pageReader,
- TimeZone convertTz) throws IOException {
+ ZoneId convertTz) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.convertTz = convertTz;
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index f028613..7306709 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -18,9 +18,9 @@
package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
-import java.util.TimeZone;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -86,7 +86,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
* The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to
* workaround incompatibilities between different engines when writing timestamp values.
*/
- private TimeZone convertTz = null;
+ private ZoneId convertTz = null;
/**
* columnBatch object that is used for batch decoding. This is created on first use and triggers
@@ -116,7 +116,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private final MemoryMode MEMORY_MODE;
- public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap, int capacity) {
+ public VectorizedParquetRecordReader(ZoneId convertTz, boolean useOffHeap, int capacity) {
this.convertTz = convertTz;
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
this.capacity = capacity;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index f52aaf0..29dbd8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -295,7 +295,7 @@ class ParquetFileFormat
val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
- Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 69c8bad..c05ecf1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.execution.datasources.parquet
-import java.util.{Locale, Map => JMap, TimeZone}
+import java.time.ZoneId
+import java.util.{Locale, Map => JMap}
import scala.collection.JavaConverters._
@@ -49,7 +50,7 @@ import org.apache.spark.sql.types._
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
-class ParquetReadSupport(val convertTz: Option[TimeZone],
+class ParquetReadSupport(val convertTz: Option[ZoneId],
enableVectorizedReader: Boolean)
extends ReadSupport[InternalRow] with Logging {
private var catalystRequestedSchema: StructType = _
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index 3098a33..5622169 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.parquet
-import java.util.TimeZone
+import java.time.ZoneId
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType
@@ -36,7 +36,7 @@ private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType,
catalystSchema: StructType,
schemaConverter: ParquetToSparkSchemaConverter,
- convertTz: Option[TimeZone])
+ convertTz: Option[ZoneId])
extends RecordMaterializer[InternalRow] {
private val rootConverter =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 98ac2ec..850adae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.math.{BigDecimal, BigInteger}
import java.nio.ByteOrder
-import java.util.TimeZone
+import java.time.{ZoneId, ZoneOffset}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -125,7 +125,7 @@ private[parquet] class ParquetRowConverter(
schemaConverter: ParquetToSparkSchemaConverter,
parquetType: GroupType,
catalystType: StructType,
- convertTz: Option[TimeZone],
+ convertTz: Option[ZoneId],
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging {
@@ -154,8 +154,6 @@ private[parquet] class ParquetRowConverter(
|${catalystType.prettyJson}
""".stripMargin)
- private[this] val UTC = DateTimeUtils.TimeZoneUTC
-
/**
* Updater used together with field converters within a [[ParquetRowConverter]]. It propagates
* converted filed values to the `ordinal`-th cell in `currentRow`.
@@ -292,7 +290,8 @@ private[parquet] class ParquetRowConverter(
val timeOfDayNanos = buf.getLong
val julianDay = buf.getInt
val rawTime = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
- val adjTime = convertTz.map(DateTimeUtils.convertTz(rawTime, _, UTC)).getOrElse(rawTime)
+ val adjTime = convertTz.map(DateTimeUtils.convertTz(rawTime, _, ZoneOffset.UTC))
+ .getOrElse(rawTime)
updater.setLong(adjTime)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index b2fc724..047bc74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.v2.parquet
import java.net.URI
-import java.util.TimeZone
+import java.time.ZoneId
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
@@ -117,7 +117,7 @@ case class ParquetPartitionReaderFactory(
file: PartitionedFile,
buildReaderFunc: (
ParquetInputSplit, InternalRow, TaskAttemptContextImpl, Option[FilterPredicate],
- Option[TimeZone]) => RecordReader[Void, T]): RecordReader[Void, T] = {
+ Option[ZoneId]) => RecordReader[Void, T]): RecordReader[Void, T] = {
val conf = broadcastedConf.value.value
val filePath = new Path(new URI(file.filePath))
@@ -156,7 +156,7 @@ case class ParquetPartitionReaderFactory(
val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
- Some(DateTimeUtils.getTimeZone(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}
@@ -184,7 +184,7 @@ case class ParquetPartitionReaderFactory(
partitionValues: InternalRow,
hadoopAttemptContext: TaskAttemptContextImpl,
pushed: Option[FilterPredicate],
- convertTz: Option[TimeZone]): RecordReader[Void, InternalRow] = {
+ convertTz: Option[ZoneId]): RecordReader[Void, InternalRow] = {
logDebug(s"Falling back to parquet-mr")
val taskContext = Option(TaskContext.get())
// ParquetRecordReader returns InternalRow
@@ -213,7 +213,7 @@ case class ParquetPartitionReaderFactory(
partitionValues: InternalRow,
hadoopAttemptContext: TaskAttemptContextImpl,
pushed: Option[FilterPredicate],
- convertTz: Option[TimeZone]): VectorizedParquetRecordReader = {
+ convertTz: Option[ZoneId]): VectorizedParquetRecordReader = {
val taskContext = Option(TaskContext.get())
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 3b3d3cc..bb8cdf3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
-import java.time.Instant
-import java.util.Locale
+import java.time.{Instant, LocalDateTime}
+import java.util.{Locale, TimeZone}
import java.util.concurrent.TimeUnit
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
@@ -803,4 +803,29 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
checkTimeZoneParsing(null)
}
}
+
+ test("SPARK-30752: convert time zones on a daylight saving day") {
+ val systemTz = "PST"
+ val sessionTz = "UTC"
+ val fromTz = "Asia/Hong_Kong"
+ val fromTs = "2019-11-03T12:00:00" // daylight saving date in PST
+ val utsTs = "2019-11-03T04:00:00"
+ val defaultTz = TimeZone.getDefault
+ try {
+ TimeZone.setDefault(DateTimeUtils.getTimeZone(systemTz))
+ withSQLConf(
+ SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true",
+ SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz) {
+ val expected = LocalDateTime.parse(utsTs)
+ .atZone(DateTimeUtils.getZoneId(sessionTz))
+ .toInstant
+ val df = Seq(fromTs).toDF("localTs")
+ checkAnswer(
+ df.select(to_utc_timestamp(col("localTs"), fromTz)),
+ Row(expected))
+ }
+ } finally {
+ TimeZone.setDefault(defaultTz)
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
index 1ded34f..649a46f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
+import java.time.ZoneOffset
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
@@ -145,8 +146,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
impalaFileData.map { ts =>
DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz(
DateTimeUtils.fromJavaTimestamp(ts),
- DateTimeUtils.TimeZoneUTC,
- DateTimeUtils.getTimeZone(conf.sessionLocalTimeZone)))
+ ZoneOffset.UTC,
+ DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)))
}
}
val fullExpectations = (ts ++ impalaExpectations).map(_.toString).sorted.toArray
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org