You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/04/06 15:35:25 UTC

spark git commit: [SPARK-23822][SQL] Improve error message for Parquet schema mismatches

Repository: spark
Updated Branches:
  refs/heads/master 6ade5cbb4 -> 945240193


[SPARK-23822][SQL] Improve error message for Parquet schema mismatches

## What changes were proposed in this pull request?

This pull request tries to improve the error message for spark while reading parquet files with different schemas, e.g. One with a STRING column and the other with a INT column. A new ParquetSchemaColumnConvertNotSupportedException is added to replace the old UnsupportedOperationException. The Exception is again wrapped in FileScanRdd.scala to throw a more a general QueryExecutionException with the actual parquet file name which trigger the exception.

## How was this patch tested?

Unit tests added to check the new exception and verify the error messages.

Also manually tested with two parquet with different schema to check the error message.

<img width="1125" alt="screen shot 2018-03-30 at 4 03 04 pm" src="https://user-images.githubusercontent.com/37087310/38156580-dd58a140-3433-11e8-973a-b816d859fbe1.png">

Author: Yuchen Huo <yu...@databricks.com>

Closes #20953 from yuchenhuo/SPARK-23822.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94524019
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94524019
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94524019

Branch: refs/heads/master
Commit: 94524019315ad463f9bc13c107131091d17c6af9
Parents: 6ade5cb
Author: Yuchen Huo <yu...@databricks.com>
Authored: Fri Apr 6 08:35:20 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Apr 6 08:35:20 2018 -0700

----------------------------------------------------------------------
 ...chemaColumnConvertNotSupportedException.java | 62 ++++++++++++++++++++
 .../parquet/VectorizedColumnReader.java         | 38 ++++++++----
 .../sql/execution/QueryExecutionException.scala |  3 +-
 .../sql/execution/datasources/FileScanRDD.scala | 21 ++++++-
 .../parquet/ParquetSchemaSuite.scala            | 55 +++++++++++++++++
 5 files changed, 166 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94524019/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException.java
new file mode 100644
index 0000000..82a1169
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * Exception thrown when the parquet reader find column type mismatches.
+ */
+@InterfaceStability.Unstable
+public class SchemaColumnConvertNotSupportedException extends RuntimeException {
+
+  /**
+   * Name of the column which cannot be converted.
+   */
+  private String column;
+  /**
+   * Physical column type in the actual parquet file.
+   */
+  private String physicalType;
+  /**
+   * Logical column type in the parquet schema the parquet reader use to parse all files.
+   */
+  private String logicalType;
+
+  public String getColumn() {
+    return column;
+  }
+
+  public String getPhysicalType() {
+    return physicalType;
+  }
+
+  public String getLogicalType() {
+    return logicalType;
+  }
+
+  public SchemaColumnConvertNotSupportedException(
+      String column,
+      String physicalType,
+      String logicalType) {
+    super();
+    this.column = column;
+    this.physicalType = physicalType;
+    this.logicalType = logicalType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/94524019/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 47dd625..72f1d02 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.TimeZone;
 
 import org.apache.parquet.bytes.BytesUtils;
@@ -31,6 +32,7 @@ import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
@@ -232,6 +234,18 @@ public class VectorizedColumnReader {
   }
 
   /**
+   * Helper function to construct exception for parquet schema mismatch.
+   */
+  private SchemaColumnConvertNotSupportedException constructConvertNotSupportedException(
+      ColumnDescriptor descriptor,
+      WritableColumnVector column) {
+    return new SchemaColumnConvertNotSupportedException(
+      Arrays.toString(descriptor.getPath()),
+      descriptor.getType().toString(),
+      column.dataType().toString());
+  }
+
+  /**
    * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
    */
   private void decodeDictionaryIds(
@@ -261,7 +275,7 @@ public class VectorizedColumnReader {
             }
           }
         } else {
-          throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
+          throw constructConvertNotSupportedException(descriptor, column);
         }
         break;
 
@@ -282,7 +296,7 @@ public class VectorizedColumnReader {
             }
           }
         } else {
-          throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
+          throw constructConvertNotSupportedException(descriptor, column);
         }
         break;
 
@@ -321,7 +335,7 @@ public class VectorizedColumnReader {
             }
           }
         } else {
-          throw new UnsupportedOperationException();
+          throw constructConvertNotSupportedException(descriptor, column);
         }
         break;
       case BINARY:
@@ -360,7 +374,7 @@ public class VectorizedColumnReader {
             }
           }
         } else {
-          throw new UnsupportedOperationException();
+          throw constructConvertNotSupportedException(descriptor, column);
         }
         break;
 
@@ -375,7 +389,9 @@ public class VectorizedColumnReader {
    */
 
   private void readBooleanBatch(int rowId, int num, WritableColumnVector column) {
-    assert(column.dataType() == DataTypes.BooleanType);
+    if (column.dataType() != DataTypes.BooleanType) {
+      throw constructConvertNotSupportedException(descriptor, column);
+    }
     defColumn.readBooleans(
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
   }
@@ -394,7 +410,7 @@ public class VectorizedColumnReader {
       defColumn.readShorts(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
     } else {
-      throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 
@@ -414,7 +430,7 @@ public class VectorizedColumnReader {
         }
       }
     } else {
-      throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 
@@ -425,7 +441,7 @@ public class VectorizedColumnReader {
       defColumn.readFloats(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
     } else {
-      throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 
@@ -436,7 +452,7 @@ public class VectorizedColumnReader {
       defColumn.readDoubles(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
     } else {
-      throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 
@@ -471,7 +487,7 @@ public class VectorizedColumnReader {
         }
       }
     } else {
-      throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 
@@ -510,7 +526,7 @@ public class VectorizedColumnReader {
         }
       }
     } else {
-      throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94524019/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala
index 16806c6..cffd97b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala
@@ -17,4 +17,5 @@
 
 package org.apache.spark.sql.execution
 
-class QueryExecutionException(message: String) extends Exception(message)
+class QueryExecutionException(message: String, cause: Throwable = null)
+  extends Exception(message, cause)

http://git-wip-us.apache.org/repos/asf/spark/blob/94524019/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 835ce98..28c36b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -21,11 +21,14 @@ import java.io.{FileNotFoundException, IOException}
 
 import scala.collection.mutable
 
+import org.apache.parquet.io.ParquetDecodingException
+
 import org.apache.spark.{Partition => RDDPartition, TaskContext, TaskKilledException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.NextIterator
 
@@ -179,7 +182,23 @@ class FileScanRDD(
             currentIterator = readCurrentFile()
           }
 
-          hasNext
+          try {
+            hasNext
+          } catch {
+            case e: SchemaColumnConvertNotSupportedException =>
+              val message = "Parquet column cannot be converted in " +
+                s"file ${currentFile.filePath}. Column: ${e.getColumn}, " +
+                s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}"
+              throw new QueryExecutionException(message, e)
+            case e: ParquetDecodingException =>
+              if (e.getMessage.contains("Can not read value at")) {
+                val message = "Encounter error while reading parquet files. " +
+                  "One possible cause: Parquet column cannot be converted in the " +
+                  "corresponding files. Details: "
+                throw new QueryExecutionException(message, e)
+              }
+              throw e
+          }
         } else {
           currentFile = null
           InputFileBlockHolder.unset()

http://git-wip-us.apache.org/repos/asf/spark/blob/94524019/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 2cd2a60..9d3dfae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -20,10 +20,13 @@ package org.apache.spark.sql.execution.datasources.parquet
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 
+import org.apache.parquet.io.ParquetDecodingException
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -382,6 +385,58 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     }
   }
 
+  // =======================================
+  // Tests for parquet schema mismatch error
+  // =======================================
+  def testSchemaMismatch(path: String, vectorizedReaderEnabled: Boolean): SparkException = {
+    import testImplicits._
+
+    var e: SparkException = null
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReaderEnabled.toString) {
+      // Create two parquet files with different schemas in the same folder
+      Seq(("bcd", 2)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet(s"$path/parquet")
+      Seq((1, "abc")).toDF("a", "b").coalesce(1).write.mode("append").parquet(s"$path/parquet")
+
+      e = intercept[SparkException] {
+        spark.read.parquet(s"$path/parquet").collect()
+      }
+    }
+    e
+  }
+
+  test("schema mismatch failure error message for parquet reader") {
+    withTempPath { dir =>
+      val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false)
+      val expectedMessage = "Encounter error while reading parquet files. " +
+        "One possible cause: Parquet column cannot be converted in the corresponding " +
+        "files. Details:"
+      assert(e.getCause.isInstanceOf[QueryExecutionException])
+      assert(e.getCause.getCause.isInstanceOf[ParquetDecodingException])
+      assert(e.getCause.getMessage.startsWith(expectedMessage))
+    }
+  }
+
+  test("schema mismatch failure error message for parquet vectorized reader") {
+    withTempPath { dir =>
+      val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
+      assert(e.getCause.isInstanceOf[QueryExecutionException])
+      assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
+
+      // Check if the physical type is reporting correctly
+      val errMsg = e.getCause.getMessage
+      assert(errMsg.startsWith("Parquet column cannot be converted in file"))
+      val file = errMsg.substring("Parquet column cannot be converted in file ".length,
+        errMsg.indexOf(". "))
+      val col = spark.read.parquet(file).schema.fields.filter(_.name.equals("a"))
+      assert(col.length == 1)
+      if (col(0).dataType == StringType) {
+        assert(errMsg.contains("Column: [a], Expected: IntegerType, Found: BINARY"))
+      } else {
+        assert(errMsg.endsWith("Column: [a], Expected: StringType, Found: INT32"))
+      }
+    }
+  }
+
   // =======================================================
   // Tests for converting Parquet LIST to Catalyst ArrayType
   // =======================================================


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org