You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/08 19:10:34 UTC

spark git commit: [SPARK-20976][SQL] Unify Error Messages for FAILFAST mode

Repository: spark
Updated Branches:
  refs/heads/master 55b8cfe6e -> 1a527bde4


[SPARK-20976][SQL] Unify Error Messages for FAILFAST mode

### What changes were proposed in this pull request?
Before 2.2, we indicate the job was terminated because of `FAILFAST` mode.
```
Malformed line in FAILFAST mode: {"a":{, b:3}
```
If possible, we should keep it. This PR is to unify the error messages.

### How was this patch tested?
Modified the existing messages.

Author: Xiao Li <ga...@gmail.com>

Closes #18196 from gatorsmile/messFailFast.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a527bde
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a527bde
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a527bde

Branch: refs/heads/master
Commit: 1a527bde49753535e6b86c18751f50c19a55f0d0
Parents: 55b8cfe
Author: Xiao Li <ga...@gmail.com>
Authored: Thu Jun 8 12:10:31 2017 -0700
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jun 8 12:10:31 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/json/JacksonParser.scala |  2 +-
 .../datasources/FailureSafeParser.scala         |  4 +++-
 .../datasources/json/JsonInferSchema.scala      |  9 ++++++---
 .../execution/datasources/json/JsonSuite.scala  | 20 +++++++++++---------
 4 files changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1a527bde/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 4ed6728..bd144c9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -278,7 +278,7 @@ class JacksonParser(
       // We cannot parse this token based on the given data type. So, we throw a
       // RuntimeException and this exception will be caught by `parse` method.
       throw new RuntimeException(
-        s"Failed to parse a value for data type $dataType (current token: $token).")
+        s"Failed to parse a value for data type ${dataType.catalogString} (current token: $token).")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1a527bde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
index 159aef2..43591a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.util._
@@ -65,7 +66,8 @@ class FailureSafeParser[IN](
         case DropMalformedMode =>
           Iterator.empty
         case FailFastMode =>
-          throw e.cause
+          throw new SparkException("Malformed records are detected in record parsing. " +
+            s"Parse Mode: ${FailFastMode.name}.", e.cause)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a527bde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
index fb632cf..a270a64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
@@ -21,6 +21,7 @@ import java.util.Comparator
 
 import com.fasterxml.jackson.core._
 
+import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
@@ -61,7 +62,8 @@ private[sql] object JsonInferSchema {
             case DropMalformedMode =>
               None
             case FailFastMode =>
-              throw e
+              throw new SparkException("Malformed records are detected in schema inference. " +
+                s"Parse Mode: ${FailFastMode.name}.", e)
           }
         }
       }
@@ -231,8 +233,9 @@ private[sql] object JsonInferSchema {
 
     case FailFastMode =>
       // If `other` is not struct type, consider it as malformed one and throws an exception.
-      throw new RuntimeException("Failed to infer a common schema. Struct types are expected" +
-        s" but ${other.catalogString} was found.")
+      throw new SparkException("Malformed records are detected in schema inference. " +
+        s"Parse Mode: ${FailFastMode.name}. Reasons: Failed to infer a common schema. " +
+        s"Struct types are expected, but `${other.catalogString}` was found.")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1a527bde/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index e66a60d..65472cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1036,24 +1036,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("Corrupt records: FAILFAST mode") {
-    val schema = StructType(
-        StructField("a", StringType, true) :: Nil)
     // `FAILFAST` mode should throw an exception for corrupt records.
     val exceptionOne = intercept[SparkException] {
       spark.read
         .option("mode", "FAILFAST")
         .json(corruptRecords)
-    }
-    assert(exceptionOne.getMessage.contains("JsonParseException"))
+    }.getMessage
+    assert(exceptionOne.contains(
+      "Malformed records are detected in schema inference. Parse Mode: FAILFAST."))
 
     val exceptionTwo = intercept[SparkException] {
       spark.read
         .option("mode", "FAILFAST")
-        .schema(schema)
+        .schema("a string")
         .json(corruptRecords)
         .collect()
-    }
-    assert(exceptionTwo.getMessage.contains("JsonParseException"))
+    }.getMessage
+    assert(exceptionTwo.contains(
+      "Malformed records are detected in record parsing. Parse Mode: FAILFAST."))
   }
 
   test("Corrupt records: DROPMALFORMED mode") {
@@ -1944,7 +1944,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
           .option("mode", "FAILFAST")
           .json(path)
       }
-      assert(exceptionOne.getMessage.contains("Failed to infer a common schema"))
+      assert(exceptionOne.getMessage.contains("Malformed records are detected in schema " +
+        "inference. Parse Mode: FAILFAST."))
 
       val exceptionTwo = intercept[SparkException] {
         spark.read
@@ -1954,7 +1955,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
           .json(path)
           .collect()
       }
-      assert(exceptionTwo.getMessage.contains("Failed to parse a value"))
+      assert(exceptionTwo.getMessage.contains("Malformed records are detected in record " +
+        "parsing. Parse Mode: FAILFAST."))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org