You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/01/25 04:40:39 UTC

spark git commit: [SPARK-12624][PYSPARK] Checks row length when converting Java arrays to Python rows

Repository: spark
Updated Branches:
  refs/heads/master e789b1d2c -> 3327fd281


[SPARK-12624][PYSPARK] Checks row length when converting Java arrays to Python rows

When actual row length doesn't conform to specified schema field length, we should give a better error message instead of throwing an unintuitive `ArrayOutOfBoundsException`.

Author: Cheng Lian <li...@databricks.com>

Closes #10886 from liancheng/spark-12624.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3327fd28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3327fd28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3327fd28

Branch: refs/heads/master
Commit: 3327fd28170b549516fee1972dc6f4c32541591b
Parents: e789b1d
Author: Cheng Lian <li...@databricks.com>
Authored: Sun Jan 24 19:40:34 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Sun Jan 24 19:40:34 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/tests.py                                 | 9 +++++++++
 .../main/scala/org/apache/spark/sql/execution/python.scala  | 9 ++++++++-
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3327fd28/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index ae86202..7593b99 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -364,6 +364,15 @@ class SQLTests(ReusedPySparkTestCase):
         df3 = self.sqlCtx.createDataFrame(rdd, df.schema)
         self.assertEqual(10, df3.count())
 
+    def test_create_dataframe_schema_mismatch(self):
+        input = [Row(a=1)]
+        rdd = self.sc.parallelize(range(3)).map(lambda i: Row(a=i))
+        schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())])
+        df = self.sqlCtx.createDataFrame(rdd, schema)
+        message = ".*Input row doesn't have expected number of values required by the schema.*"
+        with self.assertRaisesRegexp(Exception, message):
+            df.show()
+
     def test_serialize_nested_array_and_map(self):
         d = [Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")})]
         rdd = self.sc.parallelize(d)

http://git-wip-us.apache.org/repos/asf/spark/blob/3327fd28/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
index 41e35fd..e3a016e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
@@ -220,7 +220,14 @@ object EvaluatePython {
       ArrayBasedMapData(keys, values)
 
     case (c, StructType(fields)) if c.getClass.isArray =>
-      new GenericInternalRow(c.asInstanceOf[Array[_]].zip(fields).map {
+      val array = c.asInstanceOf[Array[_]]
+      if (array.length != fields.length) {
+        throw new IllegalStateException(
+          s"Input row doesn't have expected number of values required by the schema. " +
+          s"${fields.length} fields are required while ${array.length} values are provided."
+        )
+      }
+      new GenericInternalRow(array.zip(fields).map {
         case (e, f) => fromJava(e, f.dataType)
       })
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org