You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/18 08:09:18 UTC

[spark] branch branch-3.1 updated: [SPARK-33593][SQL] Vector reader got incorrect data with binary partition value

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 8a269c7  [SPARK-33593][SQL] Vector reader got incorrect data with binary partition value
8a269c7 is described below

commit 8a269c79728460fc83050fb5cb1171b8bf2e9521
Author: angerszhu <an...@gmail.com>
AuthorDate: Fri Dec 18 00:01:13 2020 -0800

    [SPARK-33593][SQL] Vector reader got incorrect data with binary partition value
    
    Currently when enable parquet vectorized reader, use binary type as partition col will return incorrect value as below UT
    ```scala
    test("Parquet vector reader incorrect with binary partition value") {
      Seq(false, true).foreach(tag => {
        withSQLConf("spark.sql.parquet.enableVectorizedReader" -> tag.toString) {
          withTable("t1") {
            sql(
              """CREATE TABLE t1(name STRING, id BINARY, part BINARY)
                | USING PARQUET PARTITIONED BY (part)""".stripMargin)
            sql(s"INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
            if (tag) {
              checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
                Row("a", "Spark SQL", ""))
            } else {
              checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
                Row("a", "Spark SQL", "Spark SQL"))
            }
          }
        }
      })
    }
    ```
    
    Fix data incorrect issue
    
    No
    
    Added UT
    
    Closes #30824 from AngersZhuuuu/SPARK-33593.
    
    Authored-by: angerszhu <an...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 0603913c666bae1a9640f2f1469fe50bc59e461d)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../execution/vectorized/ColumnVectorUtils.java    |  5 ++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 26 ++++++++
 .../orc/OrcColumnarBatchReaderSuite.scala          | 77 +++++++++++++++++++++-
 .../datasources/parquet/ParquetIOSuite.scala       |  9 ++-
 4 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index bce6aa2..25aabcd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -54,6 +54,8 @@ public class ColumnVectorUtils {
     } else {
       if (t == DataTypes.BooleanType) {
         col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
+      } else if (t == DataTypes.BinaryType) {
+        col.putByteArray(0, row.getBinary(fieldIdx));
       } else if (t == DataTypes.ByteType) {
         col.putBytes(0, capacity, row.getByte(fieldIdx));
       } else if (t == DataTypes.ShortType) {
@@ -94,6 +96,9 @@ public class ColumnVectorUtils {
         col.putInts(0, capacity, row.getInt(fieldIdx));
       } else if (t instanceof TimestampType) {
         col.putLongs(0, capacity, row.getLong(fieldIdx));
+      } else {
+        throw new RuntimeException(String.format("DataType %s is not supported" +
+            " in column vectorized reader.", t.sql()));
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2eeb729..03520ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3732,6 +3732,32 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
       checkAnswer(sql("SELECT s LIKE 'm@@ca' ESCAPE '@' FROM df"), Row(true))
     }
   }
+
+  test("SPARK-33593: Vector reader got incorrect data with binary partition value") {
+    Seq("false", "true").foreach(value => {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) {
+        withTable("t1") {
+          sql(
+            """CREATE TABLE t1(name STRING, id BINARY, part BINARY)
+              |USING PARQUET PARTITIONED BY (part)""".stripMargin)
+          sql("INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
+          checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
+            Row("a", "Spark SQL", "Spark SQL"))
+        }
+      }
+
+      withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) {
+        withTable("t2") {
+          sql(
+            """CREATE TABLE t2(name STRING, id BINARY, part BINARY)
+              |USING ORC PARTITIONED BY (part)""".stripMargin)
+          sql("INSERT INTO t2 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
+          checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t2"),
+            Row("a", "Spark SQL", "Spark SQL"))
+        }
+      }
+    })
+  }
 }
 
 case class Foo(bar: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
index 719bf91..bfcef46 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
@@ -17,16 +17,29 @@
 
 package org.apache.spark.sql.execution.datasources.orc
 
+import java.io.File
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.orc.TypeDescription
 
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
 import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.unsafe.types.UTF8String.fromString
 
 class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
+
+  import testImplicits._
+
   private val dataSchema = StructType.fromDDL("col1 int, col2 int")
   private val partitionSchema = StructType.fromDDL("p1 string, p2 string")
   private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2"))
@@ -77,4 +90,66 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
       assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0))
     }
   }
+
+  test("SPARK-33593: partition column types") {
+    withTempPath { dir =>
+      Seq(1).toDF().repartition(1).write.orc(dir.getCanonicalPath)
+
+      val dataTypes =
+        Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
+          FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
+
+      val constantValues =
+        Seq(
+          UTF8String.fromString("a string"),
+          true,
+          1.toByte,
+          "Spark SQL".getBytes,
+          2.toShort,
+          3,
+          Long.MaxValue,
+          0.25.toFloat,
+          0.75D,
+          Decimal("1234.23456"),
+          DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
+          DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
+
+      dataTypes.zip(constantValues).foreach { case (dt, v) =>
+        val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil)
+        val partitionValues = new GenericInternalRow(Array(v))
+        val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
+        val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
+        val taskConf = sqlContext.sessionState.newHadoopConf()
+        val orcFileSchema = TypeDescription.fromString(schema.simpleString)
+        val vectorizedReader = new OrcColumnarBatchReader(4096)
+        val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+        val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
+
+        try {
+          vectorizedReader.initialize(fileSplit, taskAttemptContext)
+          vectorizedReader.initBatch(
+            orcFileSchema,
+            schema.toArray,
+            Array(0, -1),
+            Array(-1, 0),
+            partitionValues)
+          vectorizedReader.nextKeyValue()
+          val row = vectorizedReader.getCurrentValue.getRow(0)
+
+          // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
+          // in order to use get(...) method which is not implemented in `ColumnarBatch`.
+          val actual = row.copy().get(1, dt)
+          val expected = v
+          if (dt.isInstanceOf[BinaryType]) {
+            assert(actual.asInstanceOf[Array[Byte]]
+              sameElements expected.asInstanceOf[Array[Byte]])
+          } else {
+            assert(actual == expected)
+          }
+        } finally {
+          vectorizedReader.close()
+        }
+      }
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index d13b3e5..c69f2e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -790,7 +790,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
       Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
 
       val dataTypes =
-        Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
+        Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
           FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
 
       val constantValues =
@@ -798,6 +798,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
           UTF8String.fromString("a string"),
           true,
           1.toByte,
+          "Spark SQL".getBytes,
           2.toShort,
           3,
           Long.MaxValue,
@@ -825,7 +826,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
           // in order to use get(...) method which is not implemented in `ColumnarBatch`.
           val actual = row.copy().get(1, dt)
           val expected = v
-          assert(actual == expected)
+          if (dt.isInstanceOf[BinaryType]) {
+            assert(actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]])
+          } else {
+            assert(actual == expected)
+          }
         } finally {
           vectorizedReader.close()
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org