You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2021/11/18 02:47:05 UTC
[spark] branch master updated: [SPARK-36346][SQL] Support TimestampNTZ type in Orc file source
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6f6a066 [SPARK-36346][SQL] Support TimestampNTZ type in Orc file source
6f6a066 is described below
commit 6f6a066dff7e1ceceb80ab5c3809d5e6fb1474e2
Author: Jiaan Geng <be...@163.com>
AuthorDate: Thu Nov 18 10:46:15 2021 +0800
[SPARK-36346][SQL] Support TimestampNTZ type in Orc file source
### What changes were proposed in this pull request?
As per https://orc.apache.org/docs/types.html, ORC includes two different forms of timestamps from the SQL world:
`Timestamp` is a date and time without a time zone, which does not change based on the time zone of the reader.
`Timestamp with local time zone` is a fixed instant in time, which does change based on the time zone of the reader.
So the contrast relationship of timestamp between Spark and ORC as follows:
- `TIMESTAMP_LTZ` => `Timestamp with local time zone`.
- `TIMESTAMP_NTZ` => `Timestamp`
Unfortunately, in Spark 3.1 or prior, Spark considered ORC `Timestamp` as `TIMESTAMP_LTZ` mistakely.
The behavior is not correct. To keep backward compatibility, we not change the mistake now.
Since 3.2, with the support of timestamp without time zone type:
- For the ORC writer, we will have to map both Spark’s `TimestampLTZ` and `TimestampNTZ` as ORC’s `TimestampNTZ` for backward compatibility. In addition, the ORC should write the column type in ORC’s type information with the key “spark.sql.catalyst.type”.
- For the ORC reader. If the type information contains the key “spark.sql.catalyst.type”, infer the data type as per the value
Otherwise, given an ORC `TimestampLTZ` column, we will infer it as Spark’s `TimestampType`; given an ORC `TimestampNTZ`, the inferred result is `TimestampType`(`TimestampLTZ`) so that the behavior is consistent with older version Spark.
### Why are the changes needed?
Docking `TimestampNTZ` type in Spark and `Timestamp` in ORC.
### Does this PR introduce _any_ user-facing change?
'Yes'.
Spark will read/write the `TimestampNTZ`/`Timestamp` in ORC.
### How was this patch tested?
New tests.
Closes #33588 from beliefer/SPARK-36346.
Lead-authored-by: Jiaan Geng <be...@163.com>
Co-authored-by: gengjiaan <ge...@360.cn>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
.../spark/sql/errors/QueryExecutionErrors.scala | 4 ++
.../datasources/orc/OrcAtomicColumnVector.java | 10 ++++
.../datasources/orc/OrcDeserializer.scala | 3 ++
.../execution/datasources/orc/OrcFileFormat.scala | 18 -------
.../execution/datasources/orc/OrcSerializer.scala | 5 +-
.../sql/execution/datasources/orc/OrcUtils.scala | 44 +++++++++++++--
.../execution/datasources/v2/orc/OrcWrite.scala | 4 +-
.../spark/sql/FileBasedDataSourceSuite.scala | 2 +-
.../execution/datasources/orc/OrcQuerySuite.scala | 62 ++++++++++++++++++++++
.../sql/execution/datasources/orc/OrcTest.scala | 7 +++
10 files changed, 132 insertions(+), 27 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index d7cd8e1..ba3dd52 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1889,5 +1889,9 @@ object QueryExecutionErrors {
def hiveTableWithAnsiIntervalsError(tableName: String): Throwable = {
new UnsupportedOperationException(s"Hive table $tableName with ANSI intervals is not supported")
}
+
+ def cannotConvertOrcTimestampToTimestampNTZError(): Throwable = {
+ new RuntimeException("Unable to convert timestamp of Orc to data type 'timestamp_ntz'")
+ }
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
index c2d8334..b4f7b99 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
@@ -27,6 +27,7 @@ import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.TimestampNTZType;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;
@@ -36,6 +37,7 @@ import org.apache.spark.unsafe.types.UTF8String;
*/
public class OrcAtomicColumnVector extends OrcColumnVector {
private final boolean isTimestamp;
+ private final boolean isTimestampNTZ;
private final boolean isDate;
// Column vector for each type. Only 1 is populated for any type.
@@ -54,6 +56,12 @@ public class OrcAtomicColumnVector extends OrcColumnVector {
isTimestamp = false;
}
+ if (type instanceof TimestampNTZType) {
+ isTimestampNTZ = true;
+ } else {
+ isTimestampNTZ = false;
+ }
+
if (type instanceof DateType) {
isDate = true;
} else {
@@ -105,6 +113,8 @@ public class OrcAtomicColumnVector extends OrcColumnVector {
int index = getRowIndex(rowId);
if (isTimestamp) {
return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
+ } else if (isTimestampNTZ) {
+ return OrcUtils.fromOrcNTZ(timestampData.asScratchTimestamp(index));
} else {
return longData.vector[index];
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
index 9140833..7ab556e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
@@ -129,6 +129,9 @@ class OrcDeserializer(
case TimestampType => (ordinal, value) =>
updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))
+ case TimestampNTZType => (ordinal, value) =>
+ updater.setLong(ordinal, OrcUtils.fromOrcNTZ(value.asInstanceOf[OrcTimestamp]))
+
case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
val v = OrcShimUtils.getDecimal(value)
v.changePrecision(precision, scale)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 26af2c3..ce851c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -42,24 +42,6 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, Utils}
-private[sql] object OrcFileFormat {
-
- def getQuotedSchemaString(dataType: DataType): String = dataType match {
- case _: DayTimeIntervalType => LongType.catalogString
- case _: YearMonthIntervalType => IntegerType.catalogString
- case _: AtomicType => dataType.catalogString
- case StructType(fields) =>
- fields.map(f => s"`${f.name}`:${getQuotedSchemaString(f.dataType)}")
- .mkString("struct<", ",", ">")
- case ArrayType(elementType, _) =>
- s"array<${getQuotedSchemaString(elementType)}>"
- case MapType(keyType, valueType, _) =>
- s"map<${getQuotedSchemaString(keyType)},${getQuotedSchemaString(valueType)}>"
- case _ => // UDT and others
- dataType.catalogString
- }
-}
-
/**
* New ORC File Format based on Apache ORC.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
index 9a1eb8a..edd5052 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.orc
import org.apache.hadoop.io._
-import org.apache.orc.TypeDescription
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
import org.apache.spark.sql.catalyst.InternalRow
@@ -148,6 +147,8 @@ class OrcSerializer(dataSchema: StructType) {
result.setNanos(ts.getNanos)
result
+ case TimestampNTZType => (getter, ordinal) => OrcUtils.toOrcNTZ(getter.getLong(ordinal))
+
case DecimalType.Fixed(precision, scale) =>
OrcShimUtils.getHiveDecimalWritable(precision, scale)
@@ -214,6 +215,6 @@ class OrcSerializer(dataSchema: StructType) {
* Return a Orc value object for the given Spark schema.
*/
private def createOrcValue(dataType: DataType) = {
- OrcStruct.createValue(TypeDescription.fromString(OrcFileFormat.getQuotedSchemaString(dataType)))
+ OrcStruct.createValue(OrcUtils.orcTypeDescription(dataType))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 17aab36..d1b7e8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.orc
import java.nio.charset.StandardCharsets.UTF_8
+import java.sql.Timestamp
import java.util.Locale
import scala.collection.JavaConverters._
@@ -28,6 +29,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.serde2.io.DateWritable
import org.apache.hadoop.io.{BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, ShortWritable, WritableComparable}
import org.apache.orc.{BooleanColumnStatistics, ColumnStatistics, DateColumnStatistics, DoubleColumnStatistics, IntegerColumnStatistics, OrcConf, OrcFile, Reader, TypeDescription, Writer}
+import org.apache.orc.mapred.OrcTimestamp
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -37,7 +39,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils}
+import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, SchemaMergeUtils}
@@ -197,7 +200,18 @@ object OrcUtils extends Logging {
requiredSchema: StructType,
reader: Reader,
conf: Configuration): Option[(Array[Int], Boolean)] = {
- val orcFieldNames = reader.getSchema.getFieldNames.asScala
+ def checkTimestampCompatibility(orcCatalystSchema: StructType, dataSchema: StructType): Unit = {
+ orcCatalystSchema.fields.map(_.dataType).zip(dataSchema.fields.map(_.dataType)).foreach {
+ case (TimestampType, TimestampNTZType) =>
+ throw QueryExecutionErrors.cannotConvertOrcTimestampToTimestampNTZError()
+ case (t1: StructType, t2: StructType) => checkTimestampCompatibility(t1, t2)
+ case _ =>
+ }
+ }
+
+ val orcSchema = reader.getSchema
+ checkTimestampCompatibility(toCatalystSchema(orcSchema), dataSchema)
+ val orcFieldNames = orcSchema.getFieldNames.asScala
val forcePositionalEvolution = OrcConf.FORCE_POSITIONAL_EVOLUTION.getBoolean(conf)
if (orcFieldNames.isEmpty) {
// SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.
@@ -278,6 +292,7 @@ object OrcUtils extends Logging {
s"array<${orcTypeDescriptionString(a.elementType)}>"
case m: MapType =>
s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>"
+ case TimestampNTZType => TypeDescription.Category.TIMESTAMP.getName
case _: DayTimeIntervalType => LongType.catalogString
case _: YearMonthIntervalType => IntegerType.catalogString
case _ => dt.catalogString
@@ -287,15 +302,23 @@ object OrcUtils extends Logging {
def getInnerTypeDecription(dt: DataType): Option[TypeDescription] = {
dt match {
case y: YearMonthIntervalType =>
- val typeDesc = orcTypeDescription(IntegerType)
+ val typeDesc = new TypeDescription(TypeDescription.Category.INT)
typeDesc.setAttribute(
CATALYST_TYPE_ATTRIBUTE_NAME, y.typeName)
Some(typeDesc)
case d: DayTimeIntervalType =>
- val typeDesc = orcTypeDescription(LongType)
+ val typeDesc = new TypeDescription(TypeDescription.Category.LONG)
typeDesc.setAttribute(
CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName)
Some(typeDesc)
+ case n: TimestampNTZType =>
+ val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP)
+ typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, n.typeName)
+ Some(typeDesc)
+ case t: TimestampType =>
+ val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP)
+ typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, t.typeName)
+ Some(typeDesc)
case _ => None
}
}
@@ -506,4 +529,17 @@ object OrcUtils extends Logging {
resultRow
}
}
+
+ def fromOrcNTZ(ts: Timestamp): Long = {
+ DateTimeUtils.millisToMicros(ts.getTime) +
+ (ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS
+ }
+
+ def toOrcNTZ(micros: Long): OrcTimestamp = {
+ val seconds = Math.floorDiv(micros, MICROS_PER_SECOND)
+ val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
+ val result = new OrcTimestamp(seconds * MILLIS_PER_SECOND)
+ result.setNanos(nanos.toInt)
+ result
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
index 286e871..1ac9266e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
@@ -23,7 +23,7 @@ import org.apache.orc.mapred.OrcStruct
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
-import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcOutputWriter, OrcUtils}
+import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcOutputWriter, OrcUtils}
import org.apache.spark.sql.execution.datasources.v2.FileWrite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -43,7 +43,7 @@ case class OrcWrite(
val conf = job.getConfiguration
- conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcFileFormat.getQuotedSchemaString(dataSchema))
+ conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.orcTypeDescriptionString(dataSchema))
conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 3f2f12d..5180908 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -697,7 +697,7 @@ class FileBasedDataSourceSuite extends QueryTest
test("SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes effect") {
Seq(1.0, 0.5).foreach { compressionFactor =>
withSQLConf(SQLConf.FILE_COMPRESSION_FACTOR.key -> compressionFactor.toString,
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "350") {
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "457") {
withTempPath { workDir =>
// the file size is 504 bytes
val workDirPath = workDir.getAbsolutePath
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index e4c33e96f..2d6978a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc
import java.io.File
import java.nio.charset.StandardCharsets
import java.sql.Timestamp
+import java.time.{LocalDateTime, ZoneOffset}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -768,6 +769,67 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
}
}
}
+
+ test("Read/write all timestamp types") {
+ val data = (0 to 255).map { i =>
+ (new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i))
+ } :+ (null, null)
+
+ withOrcFile(data) { file =>
+ withAllOrcReaders {
+ checkAnswer(spark.read.orc(file), data.toDF().collect())
+ }
+ }
+ }
+
+ test("SPARK-36346: can't read TimestampLTZ as TimestampNTZ") {
+ val data = (1 to 10).map { i =>
+ val ts = new Timestamp(i)
+ Row(ts)
+ }
+ val answer = (1 to 10).map { i =>
+ // The second parameter is `nanoOfSecond`, while java.sql.Timestamp accepts milliseconds
+ // as input. So here we multiple the `nanoOfSecond` by NANOS_PER_MILLIS
+ val ts = LocalDateTime.ofEpochSecond(0, i * 1000000, ZoneOffset.UTC)
+ Row(ts)
+ }
+ val actualSchema = StructType(Seq(StructField("time", TimestampType, false)))
+ val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))
+
+ withTempPath { file =>
+ val df = spark.createDataFrame(sparkContext.parallelize(data), actualSchema)
+ df.write.orc(file.getCanonicalPath)
+ withAllOrcReaders {
+ val msg = intercept[SparkException] {
+ spark.read.schema(providedSchema).orc(file.getCanonicalPath).collect()
+ }.getMessage
+ assert(msg.contains("Unable to convert timestamp of Orc to data type 'timestamp_ntz'"))
+ }
+ }
+ }
+
+ test("SPARK-36346: read TimestampNTZ as TimestampLTZ") {
+ val data = (1 to 10).map { i =>
+ // The second parameter is `nanoOfSecond`, while java.sql.Timestamp accepts milliseconds
+ // as input. So here we multiple the `nanoOfSecond` by NANOS_PER_MILLIS
+ val ts = LocalDateTime.ofEpochSecond(0, i * 1000000, ZoneOffset.UTC)
+ Row(ts)
+ }
+ val answer = (1 to 10).map { i =>
+ val ts = new java.sql.Timestamp(i)
+ Row(ts)
+ }
+ val actualSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))
+ val providedSchema = StructType(Seq(StructField("time", TimestampType, false)))
+
+ withTempPath { file =>
+ val df = spark.createDataFrame(sparkContext.parallelize(data), actualSchema)
+ df.write.orc(file.getCanonicalPath)
+ withAllOrcReaders {
+ checkAnswer(spark.read.schema(providedSchema).orc(file.getCanonicalPath), answer)
+ }
+ }
+ }
}
class OrcV1QuerySuite extends OrcQuerySuite {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 4243318..cd87374 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -143,6 +143,13 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
spark.read.orc(file.getAbsolutePath)
}
+ def withAllOrcReaders(code: => Unit): Unit = {
+ // test the row-based reader
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")(code)
+ // test the vectorized reader
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "true")(code)
+ }
+
/**
* Takes a sequence of products `data` to generate multi-level nested
* dataframes as new test data. It tests both non-nested and nested dataframes
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org