You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/08 19:10:34 UTC
spark git commit: [SPARK-20976][SQL] Unify Error Messages for
FAILFAST mode
Repository: spark
Updated Branches:
refs/heads/master 55b8cfe6e -> 1a527bde4
[SPARK-20976][SQL] Unify Error Messages for FAILFAST mode
### What changes were proposed in this pull request?
Before 2.2, we indicate the job was terminated because of `FAILFAST` mode.
```
Malformed line in FAILFAST mode: {"a":{, b:3}
```
If possible, we should keep it. This PR is to unify the error messages.
### How was this patch tested?
Modified the existing messages.
Author: Xiao Li <ga...@gmail.com>
Closes #18196 from gatorsmile/messFailFast.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a527bde
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a527bde
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a527bde
Branch: refs/heads/master
Commit: 1a527bde49753535e6b86c18751f50c19a55f0d0
Parents: 55b8cfe
Author: Xiao Li <ga...@gmail.com>
Authored: Thu Jun 8 12:10:31 2017 -0700
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jun 8 12:10:31 2017 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/json/JacksonParser.scala | 2 +-
.../datasources/FailureSafeParser.scala | 4 +++-
.../datasources/json/JsonInferSchema.scala | 9 ++++++---
.../execution/datasources/json/JsonSuite.scala | 20 +++++++++++---------
4 files changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1a527bde/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 4ed6728..bd144c9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -278,7 +278,7 @@ class JacksonParser(
// We cannot parse this token based on the given data type. So, we throw a
// RuntimeException and this exception will be caught by `parse` method.
throw new RuntimeException(
- s"Failed to parse a value for data type $dataType (current token: $token).")
+ s"Failed to parse a value for data type ${dataType.catalogString} (current token: $token).")
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/1a527bde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
index 159aef2..43591a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util._
@@ -65,7 +66,8 @@ class FailureSafeParser[IN](
case DropMalformedMode =>
Iterator.empty
case FailFastMode =>
- throw e.cause
+ throw new SparkException("Malformed records are detected in record parsing. " +
+ s"Parse Mode: ${FailFastMode.name}.", e.cause)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1a527bde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
index fb632cf..a270a64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
@@ -21,6 +21,7 @@ import java.util.Comparator
import com.fasterxml.jackson.core._
+import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
@@ -61,7 +62,8 @@ private[sql] object JsonInferSchema {
case DropMalformedMode =>
None
case FailFastMode =>
- throw e
+ throw new SparkException("Malformed records are detected in schema inference. " +
+ s"Parse Mode: ${FailFastMode.name}.", e)
}
}
}
@@ -231,8 +233,9 @@ private[sql] object JsonInferSchema {
case FailFastMode =>
// If `other` is not struct type, consider it as malformed one and throws an exception.
- throw new RuntimeException("Failed to infer a common schema. Struct types are expected" +
- s" but ${other.catalogString} was found.")
+ throw new SparkException("Malformed records are detected in schema inference. " +
+ s"Parse Mode: ${FailFastMode.name}. Reasons: Failed to infer a common schema. " +
+ s"Struct types are expected, but `${other.catalogString}` was found.")
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/1a527bde/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index e66a60d..65472cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1036,24 +1036,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
test("Corrupt records: FAILFAST mode") {
- val schema = StructType(
- StructField("a", StringType, true) :: Nil)
// `FAILFAST` mode should throw an exception for corrupt records.
val exceptionOne = intercept[SparkException] {
spark.read
.option("mode", "FAILFAST")
.json(corruptRecords)
- }
- assert(exceptionOne.getMessage.contains("JsonParseException"))
+ }.getMessage
+ assert(exceptionOne.contains(
+ "Malformed records are detected in schema inference. Parse Mode: FAILFAST."))
val exceptionTwo = intercept[SparkException] {
spark.read
.option("mode", "FAILFAST")
- .schema(schema)
+ .schema("a string")
.json(corruptRecords)
.collect()
- }
- assert(exceptionTwo.getMessage.contains("JsonParseException"))
+ }.getMessage
+ assert(exceptionTwo.contains(
+ "Malformed records are detected in record parsing. Parse Mode: FAILFAST."))
}
test("Corrupt records: DROPMALFORMED mode") {
@@ -1944,7 +1944,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.option("mode", "FAILFAST")
.json(path)
}
- assert(exceptionOne.getMessage.contains("Failed to infer a common schema"))
+ assert(exceptionOne.getMessage.contains("Malformed records are detected in schema " +
+ "inference. Parse Mode: FAILFAST."))
val exceptionTwo = intercept[SparkException] {
spark.read
@@ -1954,7 +1955,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.json(path)
.collect()
}
- assert(exceptionTwo.getMessage.contains("Failed to parse a value"))
+ assert(exceptionTwo.getMessage.contains("Malformed records are detected in record " +
+ "parsing. Parse Mode: FAILFAST."))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org