You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/27 02:09:37 UTC

[airflow] branch main updated: Add custom handler param in SnowflakeOperator (#25983)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e12d483bc Add custom handler param in SnowflakeOperator (#25983)
9e12d483bc is described below

commit 9e12d483bcde714ca4225c94df182c4eacd36f7c
Author: Pankaj Singh <98...@users.noreply.github.com>
AuthorDate: Sat Aug 27 07:39:30 2022 +0530

    Add custom handler param in SnowflakeOperator (#25983)
---
 airflow/providers/snowflake/operators/snowflake.py | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/snowflake/operators/snowflake.py b/airflow/providers/snowflake/operators/snowflake.py
index 697cc9a344..21db6f8c8e 100644
--- a/airflow/providers/snowflake/operators/snowflake.py
+++ b/airflow/providers/snowflake/operators/snowflake.py
@@ -15,7 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import Any, Iterable, List, Mapping, Optional, Sequence, SupportsAbs, Union
+from typing import Any, Callable, Iterable, List, Mapping, Optional, Sequence, SupportsAbs, Union
 
 from airflow.models import BaseOperator
 from airflow.providers.common.sql.hooks.sql import fetch_all_handler
@@ -78,6 +78,8 @@ class SnowflakeOperator(BaseOperator):
         through native Okta.
     :param session_parameters: You can set session-level parameters at
         the time you connect to Snowflake
+    :param handler: A Python callable that will act on cursor result.
+        By default, it will use ``fetchall``
     """
 
     template_fields: Sequence[str] = ('sql',)
@@ -99,6 +101,7 @@ class SnowflakeOperator(BaseOperator):
         schema: Optional[str] = None,
         authenticator: Optional[str] = None,
         session_parameters: Optional[dict] = None,
+        handler: Optional[Callable] = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -114,6 +117,7 @@ class SnowflakeOperator(BaseOperator):
         self.authenticator = authenticator
         self.session_parameters = session_parameters
         self.query_ids: List[str] = []
+        self.handler = handler
 
     def get_db_hook(self) -> SnowflakeHook:
         return get_db_hook(self)
@@ -122,7 +126,8 @@ class SnowflakeOperator(BaseOperator):
         """Run query on snowflake"""
         self.log.info('Executing: %s', self.sql)
         hook = self.get_db_hook()
-        execution_info = hook.run(self.sql, self.autocommit, self.parameters, fetch_all_handler)
+        handler = self.handler or fetch_all_handler
+        execution_info = hook.run(self.sql, self.autocommit, self.parameters, handler)
         self.query_ids = hook.query_ids
 
         if self.do_xcom_push: