You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2022/12/09 00:37:00 UTC
[jira] [Commented] (SPARK-41125) Simple call to createDataFrame fails with PicklingError but only on python3.11
[ https://issues.apache.org/jira/browse/SPARK-41125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17645040#comment-17645040 ]
Dongjoon Hyun commented on SPARK-41125:
---------------------------------------
Hi, [~jamet].
I believe this issue is resolved in the master branch via SPARK-40991 and SPARK-41454 .
> Simple call to createDataFrame fails with PicklingError but only on python3.11
> ------------------------------------------------------------------------------
>
> Key: SPARK-41125
> URL: https://issues.apache.org/jira/browse/SPARK-41125
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.3.1
> Reporter: Jamie
> Priority: Minor
> Attachments: screenshot-1.png
>
>
> I am using python's pytest library to write unit tests for a pyspark library I am building. pytest has a popular capability called fixtures which allow us to write reusable preparation steps for our tests. I have a simple fixture that creates a pyspark.sql.DataFrame which works on python 3.7, 3.8, 3.9, 3.10 but fails on python 3.11.
> The failing code is in a fixture called {{{}dataframe_of_purchases{}}}. Here is my fixtures code:
> {code:python}
> from decimal import Decimal
> import pytest
> from pyspark.sql import DataFrame, SparkSession
> from pyspark.sql.types import (
> DecimalType,
> IntegerType,
> StringType,
> StructField,
> StructType,
> )
> @pytest.fixture(scope="session")
> def purchases_schema():
> return StructType(
> [
> StructField("Customer", StringType(), True),
> StructField("Store", StringType(), True),
> StructField("Channel", StringType(), True),
> StructField("Product", StringType(), True),
> StructField("Quantity", IntegerType(), True),
> StructField("Basket", StringType(), True),
> StructField("GrossSpend", DecimalType(10, 2), True),
> ]
> )
> @pytest.fixture(scope="session")
> def dataframe_of_purchases(purchases_schema) -> DataFrame:
> spark = SparkSession.builder.getOrCreate()
> return spark.createDataFrame(
> data=[
> ("Leia", "Hammersmith", "Instore", "Cheddar", 2, "Basket1", Decimal(2.50))
> ],
> schema=purchases_schema,
> )
> {code}
> This code can be seen here: [https://github.com/jamiekt/jstark/blob/9e1d0e654195932a0765f66db6c8359ed8b60a3b/tests/conftest.py]
> The tests run in a GitHub Actions CI pipeline against many different versions of python on linux, Windows & MacOS. The tests only fail for python 3.11, and on all platforms:
> !screenshot-1.png!
> This run can be seen at: https://github.com/jamiekt/jstark/actions/runs/3457011099
> The error is
> {quote}_pickle.PicklingError: Could not serialize object: IndexError: tuple index out of range{quote}
> The full stacktrace is:
>
> {code}
> ../../../.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/sql/session.py:894: in createDataFrame
> return self._create_dataframe(
> ../../../.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/sql/session.py:938: in _create_dataframe
> jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
> ../../../.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/rdd.py:3113: in _to_java_object_rdd
> return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
> ../../../.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/rdd.py:3505: in _jrdd
> wrapped_func = _wrap_function(
> ../../../.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/rdd.py:3362: in _wrap_function
> pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
> ../../../.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/rdd.py:3345: in _prepare_for_python_RDD
> pickled_command = ser.dumps(command)
> Traceback (most recent call last):
> File "/home/runner/.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/serializers.py", line 458, in dumps
> return cloudpickle.dumps(obj, pickle_protocol)
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> File "/home/runner/.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
> cp.dump(obj)
> File "/home/runner/.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
> return Pickler.dump(self, obj)
> ^^^^^^^^^^^^^^^^^^^^^^^
> File "/home/runner/.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 692, in reducer_override
> return self._function_reduce(obj)
> ^^^^^^^^^^^^^^^^^^^^^^^^^^
> File "/home/runner/.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 565, in _function_reduce
> return self._dynamic_function_reduce(obj)
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> File "/home/runner/.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 546, in _dynamic_function_reduce
> state = _function_getstate(func)
> ^^^^^^^^^^^^^^^^^^^^^^^^
> File "/home/runner/.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 157, in _function_getstate
> f_globals_ref = _extract_code_globals(func.__code__)
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> File "/home/runner/.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle.py", line 334, in _extract_code_globals
> out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> File "/home/runner/.local/share/hatch/env/virtual/jstark/fjzPEUEi/jstark/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle.py", line 334, in <dictcomp>
> out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
> ~~~~~^^^^^^^
> IndexError: tuple index out of range
> {code}
> This is not a huge blocker for me however I assume it will be for someone at some point so I thought it prudent to report it here.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org