You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/21 11:30:31 UTC

[spark] branch branch-2.4 updated: [SPARK-33593][SQL][2.4] Vector reader got incorrect data with binary partition value

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 59cee43  [SPARK-33593][SQL][2.4] Vector reader got incorrect data with binary partition value
59cee43 is described below

commit 59cee43983875f63e9b32f1e380a05313aa2bb02
Author: angerszhu <an...@gmail.com>
AuthorDate: Mon Dec 21 03:27:17 2020 -0800

    [SPARK-33593][SQL][2.4] Vector reader got incorrect data with binary partition value
    
    ### What changes were proposed in this pull request?
    
    Currently when enable parquet vectorized reader, use binary type as partition col will return incorrect value as below UT
    ```scala
    test("Parquet vector reader incorrect with binary partition value") {
      Seq(false, true).foreach(tag => {
        withSQLConf("spark.sql.parquet.enableVectorizedReader" -> tag.toString) {
          withTable("t1") {
            sql(
              """CREATE TABLE t1(name STRING, id BINARY, part BINARY)
                | USING PARQUET PARTITIONED BY (part)""".stripMargin)
            sql(s"INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
            if (tag) {
              checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
                Row("a", "Spark SQL", ""))
            } else {
              checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
                Row("a", "Spark SQL", "Spark SQL"))
            }
          }
        }
      })
    }
    ```
    
    ### Why are the changes needed?
    Fix data incorrect issue
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT
    
    Closes #30840 from AngersZhuuuu/spark-33593-2.4.
    
    Authored-by: angerszhu <an...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../execution/vectorized/ColumnVectorUtils.java    |   5 +
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  26 +++++
 .../orc/OrcColumnarBatchReaderSuite.scala          | 105 +++++++++++++++++++++
 .../datasources/parquet/ParquetIOSuite.scala       |   9 +-
 4 files changed, 143 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 829f3ce..0792f5b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -54,6 +54,8 @@ public class ColumnVectorUtils {
     } else {
       if (t == DataTypes.BooleanType) {
         col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
+      } else if (t == DataTypes.BinaryType) {
+        col.putByteArray(0, row.getBinary(fieldIdx));
       } else if (t == DataTypes.ByteType) {
         col.putBytes(0, capacity, row.getByte(fieldIdx));
       } else if (t == DataTypes.ShortType) {
@@ -94,6 +96,9 @@ public class ColumnVectorUtils {
         col.putInts(0, capacity, row.getInt(fieldIdx));
       } else if (t instanceof TimestampType) {
         col.putLongs(0, capacity, row.getLong(fieldIdx));
+      } else {
+        throw new RuntimeException(String.format("DataType %s is not supported" +
+            " in column vectorized reader.", t.sql()));
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index ee7ef55..ab2a1c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3114,6 +3114,32 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-33593: Vector reader got incorrect data with binary partition value") {
+    Seq("false", "true").foreach(value => {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) {
+        withTable("t1") {
+          sql(
+            """CREATE TABLE t1(name STRING, id BINARY, part BINARY)
+              |USING PARQUET PARTITIONED BY (part)""".stripMargin)
+          sql("INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
+          checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
+            Row("a", "Spark SQL", "Spark SQL"))
+        }
+      }
+
+      withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) {
+        withTable("t2") {
+          sql(
+            """CREATE TABLE t2(name STRING, id BINARY, part BINARY)
+              |USING ORC PARTITIONED BY (part)""".stripMargin)
+          sql("INSERT INTO t2 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
+          checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t2"),
+            Row("a", "Spark SQL", "Spark SQL"))
+        }
+      }
+    })
+  }
 }
 
 case class Foo(bar: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
new file mode 100644
index 0000000..1ab4105
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import java.io.File
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.TypeDescription
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
+
+  import testImplicits._
+
+  test("SPARK-33593: partition column types") {
+    withTempPath { dir =>
+      Seq(1).toDF().repartition(1).write.orc(dir.getCanonicalPath)
+
+      val dataTypes =
+        Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
+          FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
+
+      val constantValues =
+        Seq(
+          UTF8String.fromString("a string"),
+          true,
+          1.toByte,
+          "Spark SQL".getBytes,
+          2.toShort,
+          3,
+          Long.MaxValue,
+          0.25.toFloat,
+          0.75D,
+          Decimal("1234.23456"),
+          DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
+          DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
+
+      dataTypes.zip(constantValues).foreach { case (dt, v) =>
+        val conf = sqlContext.conf
+        val dataSchema = StructType(StructField("col1", IntegerType) :: Nil)
+        val partitionSchema = StructType(StructField("pcol", dt) :: Nil)
+        val partitionValues = new GenericInternalRow(Array(v))
+        val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
+        val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
+        val taskConf = sqlContext.sessionState.newHadoopConf()
+        val orcFileSchema = TypeDescription.fromString(dataSchema.simpleString)
+        val vectorizedReader = new OrcColumnarBatchReader(
+          conf.offHeapColumnVectorEnabled, conf.getConf(SQLConf.ORC_COPY_BATCH_TO_SPARK), 4096)
+        val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+        val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
+
+        try {
+          vectorizedReader.initialize(fileSplit, taskAttemptContext)
+          vectorizedReader.initBatch(
+            orcFileSchema,
+            Array(0),
+            dataSchema.toArray,
+            partitionSchema,
+            partitionValues)
+          vectorizedReader.nextKeyValue()
+          val row = vectorizedReader.getCurrentValue.getRow(0)
+
+          // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
+          // in order to use get(...) method which is not implemented in `ColumnarBatch`.
+          val actual = row.copy().get(1, dt)
+          val expected = v
+          if (dt.isInstanceOf[BinaryType]) {
+            assert(actual.asInstanceOf[Array[Byte]]
+              sameElements expected.asInstanceOf[Array[Byte]])
+          } else {
+            assert(actual == expected)
+          }
+        } finally {
+          vectorizedReader.close()
+        }
+      }
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 6b05b9c..dff1152 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -734,7 +734,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
 
       val dataTypes =
-        Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
+        Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
           FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
 
       val constantValues =
@@ -742,6 +742,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
           UTF8String.fromString("a string"),
           true,
           1.toByte,
+          "Spark SQL".getBytes,
           2.toShort,
           3,
           Long.MaxValue,
@@ -769,7 +770,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
           // in order to use get(...) method which is not implemented in `ColumnarBatch`.
           val actual = row.copy().get(1, dt)
           val expected = v
-          assert(actual == expected)
+          if (dt.isInstanceOf[BinaryType]) {
+            assert(actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]])
+          } else {
+            assert(actual == expected)
+          }
         } finally {
           vectorizedReader.close()
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org