You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Anthony Joyce <an...@omicronmedia.com> on 2021/12/09 16:45:31 UTC

Issue with serialization

Hello fellow users-

I have encountered an error which seems to be related to serialization:

Broken DAG: [/home/etl/airflow/dags/airflow_platypus_etl_dag.py] Traceback (most recent call last): File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 574, in serialize_operator serialize_op['params'] = cls._serialize_params_dict(op.params) File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 447, in _serialize_params_dict if f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param': AttributeError: 'str' object has no attribute '__module__' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 935, in to_dict json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)} File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 847, in serialize_dag raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}') airflow.exceptions.SerializationError: Failed to serialize DAG 'Platypus_ETL': 'str' object has no attribute ‘__module__'


I have spent some time trying to figure out what is going on but to no avail. Anyone have any insight on an error like this? I am on Airflow release 2.2.2 and I am using the default packages constraints.

Thanks all,

Anthony


Re: Issue with serialization

Posted by Anthony Joyce <an...@omicronmedia.com>.
That worked! Thanks Daniel. Your help was much appreciated.

Anthony

On Dec 9, 2021, at 12:53 PM, Daniel Standish <dp...@gmail.com>> wrote:

OK this looks like an easy one to fix :)

You can't use `params` as a param in an operator.  It's already used in `BaseOperator` and has special handling for serialization.

So rename it to `task_params` or anything else, and you should be good.



On Thu, Dec 9, 2021 at 9:44 AM Anthony Joyce <an...@omicronmedia.com>> wrote:
Hi Daniel-

After some trial and error, I was able to isolate the issue. It has to do with my customer operator. See code below:

from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from contextlib import closing
from typing import Dict, Optional, Union

class MySqlToPostgresOperator(BaseOperator):
    """Selects data from a MySQL database and inserts that data into a
    PostgreSQL database. Cursors are used to minimize memory usage for large
    queries.
    """

    template_fields = ("sql", "postgres_table", "params")
    template_ext = (".sql",)
    ui_color = "#944dff"  # cool purple

    @apply_defaults
    def __init__(
        self,
        sql: str,
        mysql_conn_id: str = "mysql_default",
        postgres_table: str = "",
        postgres_conn_id: str = "postgres_default",
        params: Optional[Dict[str, Union[str, int]]] = None,
        rows_chunk: int = 5000,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        if params is None:
            params = {}
        self.sql = sql
        self.mysql_conn_id = mysql_conn_id
        self.postgres_table = postgres_table
        self.postgres_conn_id = postgres_conn_id
        self.params = params
        self.rows_chunk = rows_chunk

    def execute(self, context):
        """Establish connections to both MySQL & PostgreSQL databases, open
        cursor and begin processing query, loading chunks of rows into
        PostgreSQL. Repeat loading chunks until all rows processed for query.
        """
        source = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        target = PostgresHook(postgres_conn_id=self.postgres_conn_id)
        with closing(source.get_conn()) as conn:
            with closing(conn.cursor()) as cursor:
                cursor.execute(self.sql, self.params)
                target_fields = [x[0] for x in cursor.description]
                row_count = 0
                rows = cursor.fetchmany(self.rows_chunk)
                while len(rows) > 0:
                    row_count += len(rows)
                    target.insert_rows(
                        self.postgres_table,
                        rows,
                        target_fields=target_fields,
                        commit_every=self.rows_chunk,
                    )
                    rows = cursor.fetchmany(self.rows_chunk)
                self.log.info<http://self.log.info/>(
                    f"{row_count} row(s) inserted into {self.postgres_table}."
                )


Thanks,

Anthony


On Dec 9, 2021, at 11:52 AM, Daniel Standish <dp...@gmail.com>> wrote:

Can you provide a dag (as simplified as possible) which we can use to reproduce this error?

On Thu, Dec 9, 2021 at 8:45 AM Anthony Joyce <an...@omicronmedia.com>> wrote:
Hello fellow users-

I have encountered an error which seems to be related to serialization:

Broken DAG: [/home/etl/airflow/dags/airflow_platypus_etl_dag.py] Traceback (most recent call last): File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 574, in serialize_operator serialize_op['params'] = cls._serialize_params_dict(op.params) File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 447, in _serialize_params_dict if f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param': AttributeError: 'str' object has no attribute '__module__' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 935, in to_dict json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)} File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 847, in serialize_dag raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}') airflow.exceptions.SerializationError: Failed to serialize DAG 'Platypus_ETL': 'str' object has no attribute ‘__module__'


I have spent some time trying to figure out what is going on but to no avail. Anyone have any insight on an error like this? I am on Airflow release 2.2.2 and I am using the default packages constraints.

Thanks all,

Anthony




Re: Issue with serialization

Posted by Daniel Standish <dp...@gmail.com>.
OK this looks like an easy one to fix :)

You can't use `params` as a param in an operator.  It's already used in
`BaseOperator` and has special handling for serialization.

So rename it to `task_params` or anything else, and you should be good.



On Thu, Dec 9, 2021 at 9:44 AM Anthony Joyce <an...@omicronmedia.com>
wrote:

> Hi Daniel-
>
> After some trial and error, I was able to isolate the issue. It has to do
> with my customer operator. See code below:
>
> from airflow.providers.mysql.hooks.mysql import MySqlHook
> from airflow.hooks.postgres_hook import PostgresHook
> from airflow.models import BaseOperator
> from airflow.utils.decorators import apply_defaults
> from contextlib import closing
> from typing import Dict, Optional, Union
>
> class *MySqlToPostgresOperator*(BaseOperator):
>     """Selects data from a MySQL database and inserts that data into a
>     PostgreSQL database. Cursors are used to minimize memory usage for
> large
>     queries.
>     """
>
>     template_fields = ("sql", "postgres_table", "params")
>     template_ext = (".sql",)
>     ui_color = "#944dff"  # cool purple
>
>     @*apply_defaults*
>     def *__init__*(
>         self,
>         sql: *str*,
>         mysql_conn_id: *str* = "mysql_default",
>         postgres_table: *str* = "",
>         postgres_conn_id: *str* = "postgres_default",
>         params: Optional[Dict[*str*, Union[*str*, *int*]]] = *None*,
>         rows_chunk: *int* = 5000,
>         *args,
>         **kwargs,
>     ):
>         *super*().__init__(*args, **kwargs)
>         if params is *None*:
>             params = {}
>         self.sql = sql
>         self.mysql_conn_id = mysql_conn_id
>         self.postgres_table = postgres_table
>         self.postgres_conn_id = postgres_conn_id
>         self.params = params
>         self.rows_chunk = rows_chunk
>
>     def *execute*(self, context):
>         """Establish connections to both MySQL & PostgreSQL databases,
> open
>         cursor and begin processing query, loading chunks of rows into
>         PostgreSQL. Repeat loading chunks until all rows processed for
> query.
>         """
>         source = MySqlHook(mysql_conn_id=self.mysql_conn_id)
>         target = PostgresHook(postgres_conn_id=self.postgres_conn_id)
>         with closing(source.get_conn()) as conn:
>             with closing(conn.cursor()) as cursor:
>                 cursor.execute(self.sql, self.params)
>                 target_fields = [x[0] for x in cursor.description]
>                 row_count = 0
>                 rows = cursor.fetchmany(self.rows_chunk)
>                 while *len*(rows) > 0:
>                     row_count += *len*(rows)
>                     target.insert_rows(
>                         self.postgres_table,
>                         rows,
>                         target_fields=target_fields,
>                         commit_every=self.rows_chunk,
>                     )
>                     rows = cursor.fetchmany(self.rows_chunk)
>                 self.log.info(
>                     f"{row_count} row(s) inserted into
> {self.postgres_table}."
>                 )
>
>
> Thanks,
>
> Anthony
>
>
> On Dec 9, 2021, at 11:52 AM, Daniel Standish <dp...@gmail.com> wrote:
>
> Can you provide a dag (as simplified as possible) which we can use to
> reproduce this error?
>
> On Thu, Dec 9, 2021 at 8:45 AM Anthony Joyce <
> anthony.joyce@omicronmedia.com> wrote:
>
>> Hello fellow users-
>>
>> I have encountered an error which seems to be related to serialization:
>>
>> Broken DAG: [/home/etl/airflow/dags/airflow_platypus_etl_dag.py]
>> Traceback (most recent call last): File
>> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
>> line 574, in serialize_operator serialize_op['params'] =
>> cls._serialize_params_dict(op.params) File
>> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
>> line 447, in _serialize_params_dict if
>> f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param':
>> AttributeError: 'str' object has no attribute '__module__' During handling
>> of the above exception, another exception occurred: Traceback (most recent
>> call last): File
>> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
>> line 935, in to_dict json_dict = {"__version": cls.SERIALIZER_VERSION,
>> "dag": cls.serialize_dag(var)} File
>> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
>> line 847, in serialize_dag raise SerializationError(f'Failed to serialize
>> DAG {dag.dag_id!r}: {e}') airflow.exceptions.SerializationError: Failed to
>> serialize DAG 'Platypus_ETL': 'str' object has no attribute ‘__module__'
>>
>>
>> I have spent some time trying to figure out what is going on but to no
>> avail. Anyone have any insight on an error like this? I am on Airflow
>> release 2.2.2 and I am using the default packages constraints.
>>
>> Thanks all,
>>
>> Anthony
>>
>>
>

Re: Issue with serialization

Posted by Anthony Joyce <an...@omicronmedia.com>.
Hi Daniel-

After some trial and error, I was able to isolate the issue. It has to do with my customer operator. See code below:

from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from contextlib import closing
from typing import Dict, Optional, Union

class MySqlToPostgresOperator(BaseOperator):
    """Selects data from a MySQL database and inserts that data into a
    PostgreSQL database. Cursors are used to minimize memory usage for large
    queries.
    """

    template_fields = ("sql", "postgres_table", "params")
    template_ext = (".sql",)
    ui_color = "#944dff"  # cool purple

    @apply_defaults
    def __init__(
        self,
        sql: str,
        mysql_conn_id: str = "mysql_default",
        postgres_table: str = "",
        postgres_conn_id: str = "postgres_default",
        params: Optional[Dict[str, Union[str, int]]] = None,
        rows_chunk: int = 5000,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        if params is None:
            params = {}
        self.sql = sql
        self.mysql_conn_id = mysql_conn_id
        self.postgres_table = postgres_table
        self.postgres_conn_id = postgres_conn_id
        self.params = params
        self.rows_chunk = rows_chunk

    def execute(self, context):
        """Establish connections to both MySQL & PostgreSQL databases, open
        cursor and begin processing query, loading chunks of rows into
        PostgreSQL. Repeat loading chunks until all rows processed for query.
        """
        source = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        target = PostgresHook(postgres_conn_id=self.postgres_conn_id)
        with closing(source.get_conn()) as conn:
            with closing(conn.cursor()) as cursor:
                cursor.execute(self.sql, self.params)
                target_fields = [x[0] for x in cursor.description]
                row_count = 0
                rows = cursor.fetchmany(self.rows_chunk)
                while len(rows) > 0:
                    row_count += len(rows)
                    target.insert_rows(
                        self.postgres_table,
                        rows,
                        target_fields=target_fields,
                        commit_every=self.rows_chunk,
                    )
                    rows = cursor.fetchmany(self.rows_chunk)
                self.log.info<http://self.log.info>(
                    f"{row_count} row(s) inserted into {self.postgres_table}."
                )


Thanks,

Anthony


On Dec 9, 2021, at 11:52 AM, Daniel Standish <dp...@gmail.com>> wrote:

Can you provide a dag (as simplified as possible) which we can use to reproduce this error?

On Thu, Dec 9, 2021 at 8:45 AM Anthony Joyce <an...@omicronmedia.com>> wrote:
Hello fellow users-

I have encountered an error which seems to be related to serialization:

Broken DAG: [/home/etl/airflow/dags/airflow_platypus_etl_dag.py] Traceback (most recent call last): File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 574, in serialize_operator serialize_op['params'] = cls._serialize_params_dict(op.params) File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 447, in _serialize_params_dict if f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param': AttributeError: 'str' object has no attribute '__module__' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 935, in to_dict json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)} File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 847, in serialize_dag raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}') airflow.exceptions.SerializationError: Failed to serialize DAG 'Platypus_ETL': 'str' object has no attribute ‘__module__'


I have spent some time trying to figure out what is going on but to no avail. Anyone have any insight on an error like this? I am on Airflow release 2.2.2 and I am using the default packages constraints.

Thanks all,

Anthony



Re: Issue with serialization

Posted by Daniel Standish <dp...@gmail.com>.
Can you provide a dag (as simplified as possible) which we can use to
reproduce this error?

On Thu, Dec 9, 2021 at 8:45 AM Anthony Joyce <an...@omicronmedia.com>
wrote:

> Hello fellow users-
>
> I have encountered an error which seems to be related to serialization:
>
> Broken DAG: [/home/etl/airflow/dags/airflow_platypus_etl_dag.py] Traceback
> (most recent call last): File
> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
> line 574, in serialize_operator serialize_op['params'] =
> cls._serialize_params_dict(op.params) File
> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
> line 447, in _serialize_params_dict if
> f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param':
> AttributeError: 'str' object has no attribute '__module__' During handling
> of the above exception, another exception occurred: Traceback (most recent
> call last): File
> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
> line 935, in to_dict json_dict = {"__version": cls.SERIALIZER_VERSION,
> "dag": cls.serialize_dag(var)} File
> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
> line 847, in serialize_dag raise SerializationError(f'Failed to serialize
> DAG {dag.dag_id!r}: {e}') airflow.exceptions.SerializationError: Failed to
> serialize DAG 'Platypus_ETL': 'str' object has no attribute ‘__module__'
>
>
> I have spent some time trying to figure out what is going on but to no
> avail. Anyone have any insight on an error like this? I am on Airflow
> release 2.2.2 and I am using the default packages constraints.
>
> Thanks all,
>
> Anthony
>
>