You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "fritz-astronomer (via GitHub)" <gi...@apache.org> on 2023/02/17 18:29:37 UTC

[GitHub] [airflow] fritz-astronomer opened a new pull request, #29599: fix do_xcom_push=False bug in SnowflakeOperator

fritz-astronomer opened a new pull request, #29599:
URL: https://github.com/apache/airflow/pull/29599

   closes: #29593
   
   Add's guard check to short-circuit out of `_process_output` in the event of no output (such as `do_xcom_push=False`)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin merged pull request #29599: fix do_xcom_push=False bug in SnowflakeOperator

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin merged PR #29599:
URL: https://github.com/apache/airflow/pull/29599


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] fritz-astronomer commented on pull request #29599: fix do_xcom_push=False bug in SnowflakeOperator

Posted by "fritz-astronomer (via GitHub)" <gi...@apache.org>.
fritz-astronomer commented on PR #29599:
URL: https://github.com/apache/airflow/pull/29599#issuecomment-1438761089

   Oh! Brilliant! Just short circuit and skip all the rest. That makes sense. Not sure why I was worried that method might be needed for side-effects other than xcom


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] fritz-astronomer commented on pull request #29599: fix do_xcom_push=False bug in SnowflakeOperator

Posted by "fritz-astronomer (via GitHub)" <gi...@apache.org>.
fritz-astronomer commented on PR #29599:
URL: https://github.com/apache/airflow/pull/29599#issuecomment-1439063340

   Did a quick rebase just to clean up commits. 
   Tests are added and passing in breeze ✅ 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #29599: fix do_xcom_push=False bug in SnowflakeOperator

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on PR #29599:
URL: https://github.com/apache/airflow/pull/29599#issuecomment-1435663265

   Unfortunetly [Statics Check](https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#static-code-checks) and also required tests for avoid regression.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] fritz-astronomer commented on pull request #29599: fix do_xcom_push=False bug in SnowflakeOperator

Posted by "fritz-astronomer (via GitHub)" <gi...@apache.org>.
fritz-astronomer commented on PR #29599:
URL: https://github.com/apache/airflow/pull/29599#issuecomment-1435403027

   @hussein-awala  - the bug *is* specifically in the SnowflakeProvider - the base class just does a no-op with `_process_output` and isn't affected, I was hesitant to add it there in case the fix interferes with other providers some how by skipping a `_process_output` that does do something with an empty result set  🤔 
   
   ```
   --- a/airflow/providers/common/sql/operators/sql.py
   +++ b/airflow/providers/common/sql/operators/sql.py
   @@ -265,6 +265,9 @@ class SQLExecuteQueryOperator(BaseSQLOperator):
                return_last=self.return_last,
                **extra_kwargs,
            )
   +        # Handle do_xcom_push=False
   +        if output is None or output == [None]:
   +            return []
            if return_single_query_results(self.sql, self.return_last, self.split_statements):
                # For simplicity, we pass always list as input to _process_output, regardless if
                # single query results are going to be returned, and we return the first element
   
   --- a/airflow/providers/snowflake/operators/snowflake.py
   +++ b/airflow/providers/snowflake/operators/snowflake.py
   @@ -108,9 +108,6 @@ class SnowflakeOperator(SQLExecuteQueryOperator):
            results: Optional[list[Any]],
            descriptions: list[Sequence[Sequence] | None]
        ) -> list[Any]:
   -        # Handle do_xcom_push=False
   -        if results is None or results == [None]:
   -            return []
            validated_descriptions: list[Sequence[Sequence]] = []
            for idx, description in enumerate(descriptions):
                if not description:
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on pull request #29599: fix do_xcom_push=False bug in SnowflakeOperator

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on PR #29599:
URL: https://github.com/apache/airflow/pull/29599#issuecomment-1435737748

   > @hussein-awala - the bug _is_ specifically in the SnowflakeProvider - the base class just does a no-op with `_process_output` and isn't affected, I was hesitant to add it there in case the fix interferes with other providers some how by skipping a `_process_output` that does do something with an empty result set thinking
   > 
   > ```
   > --- a/airflow/providers/common/sql/operators/sql.py
   > +++ b/airflow/providers/common/sql/operators/sql.py
   > @@ -265,6 +265,9 @@ class SQLExecuteQueryOperator(BaseSQLOperator):
   >              return_last=self.return_last,
   >              **extra_kwargs,
   >          )
   > +        # Handle do_xcom_push=False
   > +        if output is None or output == [None]:
   > +            return []
   >          if return_single_query_results(self.sql, self.return_last, self.split_statements):
   >              # For simplicity, we pass always list as input to _process_output, regardless if
   >              # single query results are going to be returned, and we return the first element
   > 
   > --- a/airflow/providers/snowflake/operators/snowflake.py
   > +++ b/airflow/providers/snowflake/operators/snowflake.py
   > @@ -108,9 +108,6 @@ class SnowflakeOperator(SQLExecuteQueryOperator):
   >          results: Optional[list[Any]],
   >          descriptions: list[Sequence[Sequence] | None]
   >      ) -> list[Any]:
   > -        # Handle do_xcom_push=False
   > -        if results is None or results == [None]:
   > -            return []
   >          validated_descriptions: list[Sequence[Sequence]] = []
   >          for idx, description in enumerate(descriptions):
   >              if not description:
   > ```
   I think adding a condition on the `SQLExecuteQueryOperator` without changing the snowflake operator is enough to solve the problem, since we don't need to process the result because we don't want to push it as a xcom:
   ```diff
   --- a/airflow/providers/common/sql/operators/sql.py
   +++ b/airflow/providers/common/sql/operators/sql.py
   @@ -265,6 +265,8 @@ class SQLExecuteQueryOperator(BaseSQLOperator):
                return_last=self.return_last,
                **extra_kwargs,
            )
   +        if not self.do_xcom_push:
   +            return None
            if return_single_query_results(self.sql, self.return_last, self.split_statements):
                # For simplicity, we pass always list as input to _process_output, regardless if
                # single query results are going to be returned, and we return the first element
   ```
   What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29599: fix do_xcom_push=False bug in SnowflakeOperator

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29599:
URL: https://github.com/apache/airflow/pull/29599#discussion_r1113899583


##########
tests/providers/common/sql/operators/test_sql.py:
##########
@@ -89,6 +92,8 @@ def test_dont_xcom_push(self, mock_get_db_hook):
             handler=None,
             return_last=True,
         )
+        mock_process_output.assert_not_called()
+

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] fritz-astronomer commented on pull request #29599: fix do_xcom_push=False bug in SnowflakeOperator

Posted by "fritz-astronomer (via GitHub)" <gi...@apache.org>.
fritz-astronomer commented on PR #29599:
URL: https://github.com/apache/airflow/pull/29599#issuecomment-1435100097

   Test DAG to fix (see linked issue for Test DAG Demonstrating error):
   ```
   from __future__ import annotations
   
   import os
   from datetime import datetime
   from typing import Optional, Any, Sequence
   
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   
   os.environ["AIRFLOW_CONN_SNOWFLAKE"] = "snowflake://.............."
   
   
   class SnowflakePatchOperator(SnowflakeOperator):
       def _process_output(
           self,
           results: Optional[list[Any]],
           descriptions: list[Sequence[Sequence] | None]
       ) -> list[Any]:
           # Handle do_xcom_push=False
           if results is None or results == [None]:
               return [None]
           validated_descriptions: list[Sequence[Sequence]] = []
           for idx, description in enumerate(descriptions):
               if not description:
                   raise RuntimeError(
                       f"The query did not return descriptions of the cursor for query number {idx}. "
                       "Cannot return values in a form of dictionary for that query."
                   )
               validated_descriptions.append(description)
           returned_results = []
           for result_id, result_list in enumerate(results):
               current_processed_result = []
               for row in result_list:
                   dict_result: dict[Any, Any] = {}
                   for idx, description in enumerate(validated_descriptions[result_id]):
                       dict_result[description[0]] = row[idx]
                   current_processed_result.append(dict_result)
               returned_results.append(current_processed_result)
           return returned_results
   
   
   
   with DAG('snowflake_test', schedule=None, start_date=datetime(2023, 1, 1)):
       SnowflakePatchOperator(
           task_id='snowflake_test',
           snowflake_conn_id="snowflake",
           sql="select 1;",
           do_xcom_push=False
       )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org