You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/01 01:46:38 UTC

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38223: [SPARK-40770][PYTHON] Improved error messages for applyInPandas for schema mismatch

HyukjinKwon commented on code in PR #38223:
URL: https://github.com/apache/spark/pull/38223#discussion_r1010002388


##########
python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py:
##########
@@ -165,100 +148,191 @@ def merge_pandas(lft, _):
                 )
 
     def test_apply_in_pandas_not_returning_pandas_dataframe(self):
-        left = self.data1
-        right = self.data2
+        self._test_merge_error(
+            fn=lambda lft, rgt: lft.size + rgt.size,
+            error_class=PythonException,
+            error_message_regex="Return type of the user-defined function "
+            "should be pandas.DataFrame, but is <class 'numpy.int64'>",
+        )
+
+    def test_apply_in_pandas_returning_column_names(self):
+        self._test_merge(fn=lambda lft, rgt: pd.merge(lft, rgt, on=["id", "k"]))
 
+    def test_apply_in_pandas_returning_no_column_names(self):
         def merge_pandas(lft, rgt):
-            return lft.size + rgt.size
+            res = pd.merge(lft, rgt, on=["id", "k"])
+            res.columns = range(res.columns.size)
+            return res
 
-        with QuietTest(self.sc):
-            with self.assertRaisesRegex(
-                PythonException,
-                "Return type of the user-defined function should be pandas.DataFrame, "
-                "but is <class 'numpy.int64'>",
-            ):
-                (
-                    left.groupby("id")
-                    .cogroup(right.groupby("id"))
-                    .applyInPandas(merge_pandas, "id long, k int, v int, v2 int")
-                    .collect()
-                )
+        self._test_merge(fn=merge_pandas)
 
-    def test_apply_in_pandas_returning_wrong_number_of_columns(self):
-        left = self.data1
-        right = self.data2
+    def test_apply_in_pandas_returning_column_names_sometimes(self):
+        def merge_pandas(lft, rgt):
+            res = pd.merge(lft, rgt, on=["id", "k"])
+            if 0 in lft["id"] and lft["id"][0] % 2 == 0:
+                return res
+            res.columns = range(res.columns.size)
+            return res
+
+        self._test_merge(fn=merge_pandas)
 
+    def test_apply_in_pandas_returning_wrong_column_names(self):
         def merge_pandas(lft, rgt):
             if 0 in lft["id"] and lft["id"][0] % 2 == 0:
                 lft["add"] = 0
             if 0 in rgt["id"] and rgt["id"][0] % 3 == 0:
                 rgt["more"] = 1
             return pd.merge(lft, rgt, on=["id", "k"])
 
-        with QuietTest(self.sc):
-            with self.assertRaisesRegex(
-                PythonException,
-                "Number of columns of the returned pandas.DataFrame "
-                "doesn't match specified schema. Expected: 4 Actual: 6",
-            ):
-                (
-                    # merge_pandas returns two columns for even keys while we set schema to four
-                    left.groupby("id")
-                    .cogroup(right.groupby("id"))
-                    .applyInPandas(merge_pandas, "id long, k int, v int, v2 int")
-                    .collect()
-                )
+        self._test_merge_error(
+            fn=merge_pandas,
+            error_class=PythonException,
+            error_message_regex="Column names of the returned pandas.DataFrame "
+            "do not match specified schema.  Unexpected: add, more  Schema: id, k, v, v2\n",
+        )
 
-    def test_apply_in_pandas_returning_empty_dataframe(self):
-        left = self.data1
-        right = self.data2
+        # with very large schema, missing and unexpected is limited to 5
+        # and the schema is abbreviated in the error message
+        schema = "id long, k long, mean double, " + ", ".join(
+            f"column_with_long_column_name_{no} integer" for no in range(35)
+        )
+        self._test_merge_error(
+            fn=lambda lft, rgt: pd.DataFrame(
+                [
+                    (
+                        lft.id,
+                        lft.k,
+                        lft.v.mean(),
+                    )
+                    + tuple(lft.v.mean() for _ in range(7))
+                ],
+                columns=["id", "k", "mean"] + [f"extra_column_{no} integer" for no in range(7)],
+            ),
+            output_schema=schema,
+            error_class=PythonException,
+            error_message_regex="Column names of the returned pandas\\.DataFrame "
+            "do not match specified schema\\.  "
+            "Missing \\(first 5 of 35\\): column_with_long_column_name_0,"
+            " column_with_long_column_name_1, column_with_long_column_name_10,"
+            " column_with_long_column_name_11, column_with_long_column_name_12  "
+            "Unexpected \\(first 5 of 7\\): extra_column_0 integer, extra_column_1 integer,"
+            " extra_column_2 integer, extra_column_3 integer, extra_column_4 integer  "
+            "Schema: id, k, mean, column_with_long_column_name_0, column_with_long_column_name_1,"
+            " column_with_long_column_name_2, column_with_long_column_name_3,"
+            " column_with_long_column_name_4, column_with_long_column_name_5,"
+            " column_with_long_column_name_6, column_with_long_column_name_7,"
+            " column_with_long_column_name_8, column_with_long_column_name_9,"
+            " column_with_long_column_name_10, column_with_long_column_name_11,"
+            " column_with_long_column_name_12, column_with_long_column_name_13,"
+            " column_with_long_column_name_14, column_with_\\.\\.\\.g_column_name_19,"
+            " column_with_long_column_name_20, column_with_long_column_name_21,"
+            " column_with_long_column_name_22, column_with_long_column_name_23,"
+            " column_with_long_column_name_24, column_with_long_column_name_25,"
+            " column_with_long_column_name_26, column_with_long_column_name_27,"
+            " column_with_long_column_name_28, column_with_long_column_name_29,"
+            " column_with_long_column_name_30, column_with_long_column_name_31,"
+            " column_with_long_column_name_32, column_with_long_column_name_33,"
+            " column_with_long_column_name_34\n",
+        )
 
+    def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self):
         def merge_pandas(lft, rgt):
             if 0 in lft["id"] and lft["id"][0] % 2 == 0:
-                return pd.DataFrame([])
+                lft[3] = 0
             if 0 in rgt["id"] and rgt["id"][0] % 3 == 0:
-                return pd.DataFrame([])
-            return pd.merge(lft, rgt, on=["id", "k"])
-
-        result = (
-            left.groupby("id")
-            .cogroup(right.groupby("id"))
-            .applyInPandas(merge_pandas, "id long, k int, v int, v2 int")
-            .sort(["id", "k"])
-            .toPandas()
+                rgt[3] = 1
+            res = pd.merge(lft, rgt, on=["id", "k"])
+            res.columns = range(res.columns.size)
+            return res
+
+        self._test_merge_error(
+            fn=merge_pandas,
+            error_class=PythonException,
+            error_message_regex="Number of columns of the returned pandas.DataFrame "
+            "doesn't match specified schema.  Expected: 4  Actual: 6  Schema: id, k, v, v2\n",
         )
 
-        left = left.toPandas()
-        right = right.toPandas()
-
-        expected = pd.merge(
-            left[left["id"] % 2 != 0], right[right["id"] % 3 != 0], on=["id", "k"]
-        ).sort_values(by=["id", "k"])
+        # with very large schema the schema is abbreviated in the error message
+        schema = "id long, k long, mean double, " + ", ".join(
+            f"column_with_long_column_name_{no} integer" for no in range(35)
+        )
 
-        assert_frame_equal(expected, result)
+        def fn(lft, _):
+            # remove column names from lft DataFrame
+            lft.columns = range(lft.columns.size)
+            return lft
 
-    def test_apply_in_pandas_returning_empty_dataframe_and_wrong_number_of_columns(self):
-        left = self.data1
-        right = self.data2
+        self._test_merge_error(
+            fn=fn,
+            output_schema=schema,
+            error_class=PythonException,
+            error_message_regex="Number of columns of the returned pandas\\.DataFrame "
+            "doesn't match specified schema\\.  Expected: 38  Actual: 3  "
+            "Schema: id, k, mean, column_with_long_column_name_0, column_with_long_column_name_1,"
+            " column_with_long_column_name_2, column_with_long_column_name_3,"

Review Comment:
   I think regex match is fine. this is too long .e.g, just `column_with_long_column_name_.*` would be good enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org