You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/21 11:30:31 UTC
[spark] branch branch-2.4 updated: [SPARK-33593][SQL][2.4] Vector
reader got incorrect data with binary partition value
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 59cee43 [SPARK-33593][SQL][2.4] Vector reader got incorrect data with binary partition value
59cee43 is described below
commit 59cee43983875f63e9b32f1e380a05313aa2bb02
Author: angerszhu <an...@gmail.com>
AuthorDate: Mon Dec 21 03:27:17 2020 -0800
[SPARK-33593][SQL][2.4] Vector reader got incorrect data with binary partition value
### What changes were proposed in this pull request?
Currently when enable parquet vectorized reader, use binary type as partition col will return incorrect value as below UT
```scala
test("Parquet vector reader incorrect with binary partition value") {
Seq(false, true).foreach(tag => {
withSQLConf("spark.sql.parquet.enableVectorizedReader" -> tag.toString) {
withTable("t1") {
sql(
"""CREATE TABLE t1(name STRING, id BINARY, part BINARY)
| USING PARQUET PARTITIONED BY (part)""".stripMargin)
sql(s"INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
if (tag) {
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
Row("a", "Spark SQL", ""))
} else {
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
Row("a", "Spark SQL", "Spark SQL"))
}
}
}
})
}
```
### Why are the changes needed?
Fix data incorrect issue
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT
Closes #30840 from AngersZhuuuu/spark-33593-2.4.
Authored-by: angerszhu <an...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../execution/vectorized/ColumnVectorUtils.java | 5 +
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 26 +++++
.../orc/OrcColumnarBatchReaderSuite.scala | 105 +++++++++++++++++++++
.../datasources/parquet/ParquetIOSuite.scala | 9 +-
4 files changed, 143 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 829f3ce..0792f5b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -54,6 +54,8 @@ public class ColumnVectorUtils {
} else {
if (t == DataTypes.BooleanType) {
col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
+ } else if (t == DataTypes.BinaryType) {
+ col.putByteArray(0, row.getBinary(fieldIdx));
} else if (t == DataTypes.ByteType) {
col.putBytes(0, capacity, row.getByte(fieldIdx));
} else if (t == DataTypes.ShortType) {
@@ -94,6 +96,9 @@ public class ColumnVectorUtils {
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
col.putLongs(0, capacity, row.getLong(fieldIdx));
+ } else {
+ throw new RuntimeException(String.format("DataType %s is not supported" +
+ " in column vectorized reader.", t.sql()));
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index ee7ef55..ab2a1c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3114,6 +3114,32 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}
+
+ test("SPARK-33593: Vector reader got incorrect data with binary partition value") {
+ Seq("false", "true").foreach(value => {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) {
+ withTable("t1") {
+ sql(
+ """CREATE TABLE t1(name STRING, id BINARY, part BINARY)
+ |USING PARQUET PARTITIONED BY (part)""".stripMargin)
+ sql("INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
+ checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
+ Row("a", "Spark SQL", "Spark SQL"))
+ }
+ }
+
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) {
+ withTable("t2") {
+ sql(
+ """CREATE TABLE t2(name STRING, id BINARY, part BINARY)
+ |USING ORC PARTITIONED BY (part)""".stripMargin)
+ sql("INSERT INTO t2 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
+ checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t2"),
+ Row("a", "Spark SQL", "Spark SQL"))
+ }
+ }
+ })
+ }
}
case class Foo(bar: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
new file mode 100644
index 0000000..1ab4105
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import java.io.File
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.TypeDescription
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
+
+ import testImplicits._
+
+ test("SPARK-33593: partition column types") {
+ withTempPath { dir =>
+ Seq(1).toDF().repartition(1).write.orc(dir.getCanonicalPath)
+
+ val dataTypes =
+ Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
+ FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
+
+ val constantValues =
+ Seq(
+ UTF8String.fromString("a string"),
+ true,
+ 1.toByte,
+ "Spark SQL".getBytes,
+ 2.toShort,
+ 3,
+ Long.MaxValue,
+ 0.25.toFloat,
+ 0.75D,
+ Decimal("1234.23456"),
+ DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
+ DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
+
+ dataTypes.zip(constantValues).foreach { case (dt, v) =>
+ val conf = sqlContext.conf
+ val dataSchema = StructType(StructField("col1", IntegerType) :: Nil)
+ val partitionSchema = StructType(StructField("pcol", dt) :: Nil)
+ val partitionValues = new GenericInternalRow(Array(v))
+ val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
+ val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
+ val taskConf = sqlContext.sessionState.newHadoopConf()
+ val orcFileSchema = TypeDescription.fromString(dataSchema.simpleString)
+ val vectorizedReader = new OrcColumnarBatchReader(
+ conf.offHeapColumnVectorEnabled, conf.getConf(SQLConf.ORC_COPY_BATCH_TO_SPARK), 4096)
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+ val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
+
+ try {
+ vectorizedReader.initialize(fileSplit, taskAttemptContext)
+ vectorizedReader.initBatch(
+ orcFileSchema,
+ Array(0),
+ dataSchema.toArray,
+ partitionSchema,
+ partitionValues)
+ vectorizedReader.nextKeyValue()
+ val row = vectorizedReader.getCurrentValue.getRow(0)
+
+ // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
+ // in order to use get(...) method which is not implemented in `ColumnarBatch`.
+ val actual = row.copy().get(1, dt)
+ val expected = v
+ if (dt.isInstanceOf[BinaryType]) {
+ assert(actual.asInstanceOf[Array[Byte]]
+ sameElements expected.asInstanceOf[Array[Byte]])
+ } else {
+ assert(actual == expected)
+ }
+ } finally {
+ vectorizedReader.close()
+ }
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 6b05b9c..dff1152 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -734,7 +734,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
val dataTypes =
- Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
+ Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
val constantValues =
@@ -742,6 +742,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
UTF8String.fromString("a string"),
true,
1.toByte,
+ "Spark SQL".getBytes,
2.toShort,
3,
Long.MaxValue,
@@ -769,7 +770,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
val actual = row.copy().get(1, dt)
val expected = v
- assert(actual == expected)
+ if (dt.isInstanceOf[BinaryType]) {
+ assert(actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]])
+ } else {
+ assert(actual == expected)
+ }
} finally {
vectorizedReader.close()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org