You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/18 08:09:18 UTC
[spark] branch branch-3.1 updated: [SPARK-33593][SQL] Vector reader
got incorrect data with binary partition value
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 8a269c7 [SPARK-33593][SQL] Vector reader got incorrect data with binary partition value
8a269c7 is described below
commit 8a269c79728460fc83050fb5cb1171b8bf2e9521
Author: angerszhu <an...@gmail.com>
AuthorDate: Fri Dec 18 00:01:13 2020 -0800
[SPARK-33593][SQL] Vector reader got incorrect data with binary partition value
Currently when enable parquet vectorized reader, use binary type as partition col will return incorrect value as below UT
```scala
test("Parquet vector reader incorrect with binary partition value") {
Seq(false, true).foreach(tag => {
withSQLConf("spark.sql.parquet.enableVectorizedReader" -> tag.toString) {
withTable("t1") {
sql(
"""CREATE TABLE t1(name STRING, id BINARY, part BINARY)
| USING PARQUET PARTITIONED BY (part)""".stripMargin)
sql(s"INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
if (tag) {
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
Row("a", "Spark SQL", ""))
} else {
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
Row("a", "Spark SQL", "Spark SQL"))
}
}
}
})
}
```
Fix data incorrect issue
No
Added UT
Closes #30824 from AngersZhuuuu/SPARK-33593.
Authored-by: angerszhu <an...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 0603913c666bae1a9640f2f1469fe50bc59e461d)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../execution/vectorized/ColumnVectorUtils.java | 5 ++
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 26 ++++++++
.../orc/OrcColumnarBatchReaderSuite.scala | 77 +++++++++++++++++++++-
.../datasources/parquet/ParquetIOSuite.scala | 9 ++-
4 files changed, 114 insertions(+), 3 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index bce6aa2..25aabcd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -54,6 +54,8 @@ public class ColumnVectorUtils {
} else {
if (t == DataTypes.BooleanType) {
col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
+ } else if (t == DataTypes.BinaryType) {
+ col.putByteArray(0, row.getBinary(fieldIdx));
} else if (t == DataTypes.ByteType) {
col.putBytes(0, capacity, row.getByte(fieldIdx));
} else if (t == DataTypes.ShortType) {
@@ -94,6 +96,9 @@ public class ColumnVectorUtils {
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
col.putLongs(0, capacity, row.getLong(fieldIdx));
+ } else {
+ throw new RuntimeException(String.format("DataType %s is not supported" +
+ " in column vectorized reader.", t.sql()));
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2eeb729..03520ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3732,6 +3732,32 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkAnswer(sql("SELECT s LIKE 'm@@ca' ESCAPE '@' FROM df"), Row(true))
}
}
+
+ test("SPARK-33593: Vector reader got incorrect data with binary partition value") {
+ Seq("false", "true").foreach(value => {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) {
+ withTable("t1") {
+ sql(
+ """CREATE TABLE t1(name STRING, id BINARY, part BINARY)
+ |USING PARQUET PARTITIONED BY (part)""".stripMargin)
+ sql("INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
+ checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
+ Row("a", "Spark SQL", "Spark SQL"))
+ }
+ }
+
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) {
+ withTable("t2") {
+ sql(
+ """CREATE TABLE t2(name STRING, id BINARY, part BINARY)
+ |USING ORC PARTITIONED BY (part)""".stripMargin)
+ sql("INSERT INTO t2 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
+ checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t2"),
+ Row("a", "Spark SQL", "Spark SQL"))
+ }
+ }
+ })
+ }
}
case class Foo(bar: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
index 719bf91..bfcef46 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
@@ -17,16 +17,29 @@
package org.apache.spark.sql.execution.datasources.orc
+import java.io.File
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.orc.TypeDescription
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.UTF8String.fromString
class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
+
+ import testImplicits._
+
private val dataSchema = StructType.fromDDL("col1 int, col2 int")
private val partitionSchema = StructType.fromDDL("p1 string, p2 string")
private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2"))
@@ -77,4 +90,66 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0))
}
}
+
+ test("SPARK-33593: partition column types") {
+ withTempPath { dir =>
+ Seq(1).toDF().repartition(1).write.orc(dir.getCanonicalPath)
+
+ val dataTypes =
+ Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
+ FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
+
+ val constantValues =
+ Seq(
+ UTF8String.fromString("a string"),
+ true,
+ 1.toByte,
+ "Spark SQL".getBytes,
+ 2.toShort,
+ 3,
+ Long.MaxValue,
+ 0.25.toFloat,
+ 0.75D,
+ Decimal("1234.23456"),
+ DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
+ DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
+
+ dataTypes.zip(constantValues).foreach { case (dt, v) =>
+ val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil)
+ val partitionValues = new GenericInternalRow(Array(v))
+ val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
+ val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
+ val taskConf = sqlContext.sessionState.newHadoopConf()
+ val orcFileSchema = TypeDescription.fromString(schema.simpleString)
+ val vectorizedReader = new OrcColumnarBatchReader(4096)
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+ val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
+
+ try {
+ vectorizedReader.initialize(fileSplit, taskAttemptContext)
+ vectorizedReader.initBatch(
+ orcFileSchema,
+ schema.toArray,
+ Array(0, -1),
+ Array(-1, 0),
+ partitionValues)
+ vectorizedReader.nextKeyValue()
+ val row = vectorizedReader.getCurrentValue.getRow(0)
+
+ // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
+ // in order to use get(...) method which is not implemented in `ColumnarBatch`.
+ val actual = row.copy().get(1, dt)
+ val expected = v
+ if (dt.isInstanceOf[BinaryType]) {
+ assert(actual.asInstanceOf[Array[Byte]]
+ sameElements expected.asInstanceOf[Array[Byte]])
+ } else {
+ assert(actual == expected)
+ }
+ } finally {
+ vectorizedReader.close()
+ }
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index d13b3e5..c69f2e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -790,7 +790,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
val dataTypes =
- Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
+ Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
val constantValues =
@@ -798,6 +798,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
UTF8String.fromString("a string"),
true,
1.toByte,
+ "Spark SQL".getBytes,
2.toShort,
3,
Long.MaxValue,
@@ -825,7 +826,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
val actual = row.copy().get(1, dt)
val expected = v
- assert(actual == expected)
+ if (dt.isInstanceOf[BinaryType]) {
+ assert(actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]])
+ } else {
+ assert(actual == expected)
+ }
} finally {
vectorizedReader.close()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org