You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/25 13:22:42 UTC

[GitHub] [airflow] mik-laj opened a new pull request, #24652: Add @task.snowpark decorator

mik-laj opened a new pull request, #24652:
URL: https://github.com/apache/airflow/pull/24652

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragement file, named `{pr_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r906792813


##########
tests/providers/snowflake/decorators/test_snowpark.py:
##########
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+from airflow.decorators import task
+from airflow.models.dag import DAG
+from airflow.utils import timezone
+
+try:
+    import snowflake.snowpark as snowpark
+    from snowflake.snowpark import Session
+except ImportError:
+    snowpark = None
+
+DEFAULT_DATE = timezone.datetime(2021, 9, 1)
+
+
+class TestDockerDecorator:
+    @unittest.skipIf(snowpark is None, 'snowpark package not present')

Review Comment:
   You can use `pytest.mark.skipif` instead, which does work on classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1021115584


##########
tests/system/providers/snowflake/example_snowflake_snowpark.py:
##########
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example use of Snowflake Snowpark related decorator.
+"""
+from __future__ import annotations
+
+import sys
+
+import pytest
+
+if not sys.version_info[0:2] == (3, 8):
+    pytest.skip("unsupported python version", allow_module_level=True)
+from datetime import datetime
+from random import uniform
+
+import snowflake.snowpark
+from snowflake.snowpark.functions import col, count, lit, random as spRandom, sproc, uniform as spUniform
+from snowflake.snowpark.types import FloatType
+
+from airflow import DAG, AirflowException
+from airflow.decorators import task
+
+SNOWFLAKE_CONN_ID = "my_snowflake_conn"
+DAG_ID = "example_snowflake_snowpark"
+
+with DAG(
+    DAG_ID,
+    start_date=datetime(2021, 1, 1),
+    schedule_interval="@once",

Review Comment:
   ```suggestion
       schedule="@once",
   ```



##########
airflow/providers/snowflake/decorators/snowpark.py:
##########
@@ -0,0 +1,154 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Callable, Sequence
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.exceptions import AirflowException
+from airflow.operators.python import PythonOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+
+if TYPE_CHECKING:
+    from airflow.decorators.base import TaskDecorator
+
+try:
+    import snowflake.snowpark  # noqa
+except ImportError:
+    raise AirflowException(
+        "The snowflake-snowpark-python package is not installed. Make sure you are using Python 3.8."
+    )
+
+
+class _SnowparkDecoratedOperator(DecoratedOperator, PythonOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param snowflake_conn_id: Reference to
+        :ref:`Snowflake connection id<howto/connection:snowflake>`
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :param warehouse: name of warehouse (will overwrite any warehouse defined in the connection's extra JSON)
+    :param database: name of database (will overwrite database defined in connection)
+    :param schema: name of schema (will overwrite schema defined in connection)
+    :param role: name of role (will overwrite any role defined in connection's extra JSON)
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :param session_parameters: You can set session-level parameters at the time you connect to Snowflake
+    :param python_callable: A reference to an object that is callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked in your function (templated)
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    """
+
+    custom_operator_name = "@task.snowpark"
+
+    template_fields: Sequence[str] = ("op_args", "op_kwargs")
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs: Sequence[str] = ("python_callable",)
+
+    def __init__(
+        self,
+        *,
+        snowflake_conn_id: str = "snowflake_default",
+        parameters: dict | None = None,
+        warehouse: str | None = None,
+        database: str | None = None,
+        role: str | None = None,
+        schema: str | None = None,
+        authenticator: str | None = None,
+        session_parameters: dict | None = None,
+        python_callable,
+        op_args,
+        op_kwargs: dict,
+        **kwargs,
+    ) -> None:
+        self.snowflake_conn_id = snowflake_conn_id
+        self.parameters = parameters
+        self.warehouse = warehouse
+        self.database = database
+        self.role = role
+        self.schema = schema
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+        kwargs_to_upstream = {
+            "python_callable": python_callable,
+            "op_args": op_args,
+            "op_kwargs": op_kwargs,
+        }
+        super().__init__(
+            kwargs_to_upstream=kwargs_to_upstream,
+            python_callable=python_callable,
+            op_args=op_args,
+            # airflow.decorators.base.DecoratedOperator checks if the functions are bindable, so we have to
+            # add an artificial value to pass the validation. The real value is determined at runtime.
+            op_kwargs={**op_kwargs, "snowpark_session": None},
+            **kwargs,
+        )
+
+    def execute_callable(self):
+        hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            parameters=self.parameters,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            schema=self.schema,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        snowpark_session = hook.get_snowpark_session()
+        try:
+            op_kwargs = dict(self.op_kwargs)
+            # Set real sessions as an argument to the function.
+            op_kwargs["snowpark_session"] = snowpark_session
+            return self.python_callable(*self.op_args, **self.op_kwargs)
+        finally:
+            snowpark_session.close()

Review Comment:
   ```suggestion
           with hook.get_snowpark_session() as snowpark_session:
               op_kwargs = dict(self.op_kwargs)
               # Set real sessions as an argument to the function.
               op_kwargs["snowpark_session"] = snowpark_session
               return self.python_callable(*self.op_args, **self.op_kwargs)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1021049915


##########
Dockerfile:
##########
@@ -1234,8 +1234,10 @@ ARG ADDITIONAL_PYTHON_DEPS=""
 # https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates
 # * authlib, gcloud_aio_auth, adal are needed to generate constraints for PyPI packages and can be removed after we release
 #   new google, azure providers
+# * cloudpickle==2.0.0 is required by snowfalke-snowpark-python, which conflicts with new versions of apache beam, so
+#   we pin pinning apache-beam also to the latest version which uses cloudpickle==2.0.0.

Review Comment:
   ```suggestion
   #   we pin apache-beam also to the latest version which uses cloudpickle==2.0.0.
   ```typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sfc-gh-madkins commented on pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
sfc-gh-madkins commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1283075152

   @mik-laj bump


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1287928113

   maybe we could get rid of the system tests for now and add them later. they aren't strictly required.  wdyt @mik-laj ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] turbaszek commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r906770233


##########
tests/providers/snowflake/decorators/test_snowpark.py:
##########
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+from airflow.decorators import task
+from airflow.models.dag import DAG
+from airflow.utils import timezone
+
+try:
+    import snowflake.snowpark as snowpark
+    from snowflake.snowpark import Session
+except ImportError:
+    snowpark = None
+
+DEFAULT_DATE = timezone.datetime(2021, 9, 1)
+
+
+class TestDockerDecorator:
+    @unittest.skipIf(snowpark is None, 'snowpark package not present')

Review Comment:
   Shouldn't we apply this decorator to class? In this way we don't have to duplicate the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sfc-gh-madkins commented on pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
sfc-gh-madkins commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1354269088

   Hey Michael -- Give me a few chances to hunt down the answer to this.
   
   On Mon, Dec 12, 2022 at 4:04 AM mpgreg ***@***.***> wrote:
   
   > @sfc-gh-madkins <https://github.com/sfc-gh-madkins> any thoughts from eng
   > on why the older version dep for Apache Beam and if/when they would bump
   > that up?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/airflow/pull/24652#issuecomment-1346202165>,
   > or unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/ATSRCUYSQH7E6XXUHA72WU3WM32BLANCNFSM5Z2I7WZQ>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #24652: Add ``@task.snowpark`` decorator

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #24652: Add ``@task.snowpark`` decorator
URL: https://github.com/apache/airflow/pull/24652


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1021049915


##########
Dockerfile:
##########
@@ -1234,8 +1234,10 @@ ARG ADDITIONAL_PYTHON_DEPS=""
 # https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates
 # * authlib, gcloud_aio_auth, adal are needed to generate constraints for PyPI packages and can be removed after we release
 #   new google, azure providers
+# * cloudpickle==2.0.0 is required by snowfalke-snowpark-python, which conflicts with new versions of apache beam, so
+#   we pin pinning apache-beam also to the latest version which uses cloudpickle==2.0.0.

Review Comment:
   ```suggestion
   #   we pin apache-beam also to the latest version which uses cloudpickle==2.0.0.
   ```
   
   typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1022071874


##########
dev/breeze/SELECTIVE_CHECKS.md:
##########
@@ -77,7 +78,8 @@ The logic implements the following rules:
 * `Full tests` mode is enabled when the event is PUSH, or SCHEDULE or when "full tests needed" label is set.
   That enables all matrix combinations of variables, and all possible tests
 * Python, Kubernetes, Backend, Kind, Helm versions are limited to "defaults" only unless `Full tests` mode
-  is enabled.
+  is enabled. Tests for Snowflake are always performed on Python 3.8 as well, as `snowflake-snowpark-python`

Review Comment:
   I think (similarly as above) - this is wrong. If snowpark does not work on other Python than 3.8 then we should not add it as community provider. We have currently NO providers that work only on specific version of Python. Much better solution is if snowflake relases their own "snowpark" provider - only available on 3.8 and limiting beam and cloudpickle rather than trying to make it part of the snowflake provider.  It COULD be an optional feature of the snowflake provider (with some way of detecting when it is used outside of it) but then it should not really impact limiting other providers that use the same dependencies I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1314110767

   Some provider tests are also failing though, not sure if they are related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r954588592


##########
airflow/providers/snowflake/decorators/snowpark.py:
##########
@@ -0,0 +1,143 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, Callable, Dict, Optional, Sequence
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import PythonOperator
+from airflow.providers.snowflake.operators.snowflake import get_db_hook
+
+if TYPE_CHECKING:
+    import snowflake
+
+    from airflow.decorators.base import TaskDecorator
+
+try:
+    import snowflake.snowpark  # noqa
+except ImportError as e:
+    # Snowpark is an optional feature and if imports are missing, it should be silently ignored
+    # As of Airflow 2.3  and above the operator can throw OptionalProviderFeatureException
+    try:
+        from airflow.exceptions import AirflowOptionalProviderFeatureException
+    except ImportError:
+        # However, in order to keep backwards-compatibility with Airflow 2.1 and 2.2, if the
+        # 2.3 exception cannot be imported, the original ImportError should be raised.
+        # This try/except can be removed when the provider depends on Airflow >= 2.3.0
+        raise e from None
+    raise AirflowOptionalProviderFeatureException(e)
+
+
+class _SnowparkDecoratedOperator(DecoratedOperator, PythonOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param snowflake_conn_id: Reference to
+        :ref:`Snowflake connection id<howto/connection:snowflake>`
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :param warehouse: name of warehouse (will overwrite any warehouse defined in the connection's extra JSON)
+    :param database: name of database (will overwrite database defined in connection)
+    :param schema: name of schema (will overwrite schema defined in connection)
+    :param role: name of role (will overwrite any role defined in connection's extra JSON)
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :param session_parameters: You can set session-level parameters at the time you connect to Snowflake
+    :param python_callable: A reference to an object that is callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked in your function (templated)
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    """
+
+    template_fields: Sequence[str] = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs: Sequence[str] = ('python_callable',)
+
+    def __init__(
+        self,
+        *,
+        snowflake_conn_id: str = 'snowflake_default',
+        parameters: Optional[dict] = None,
+        warehouse: Optional[str] = None,
+        database: Optional[str] = None,
+        role: Optional[str] = None,
+        schema: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        python_callable,
+        op_args,
+        op_kwargs: Dict,
+        **kwargs,
+    ) -> None:
+        self.snowflake_conn_id = snowflake_conn_id
+        self.parameters = parameters
+        self.warehouse = warehouse
+        self.database = database
+        self.role = role
+        self.schema = schema
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+        kwargs_to_upstream = {
+            "python_callable": python_callable,
+            "op_args": op_args,
+            "op_kwargs": op_kwargs,
+        }
+        super().__init__(
+            kwargs_to_upstream=kwargs_to_upstream,
+            python_callable=python_callable,
+            op_args=op_args,
+            # airflow.decorators.base.DecoratedOperator checks if the functions are bindable, so we have to
+            # add an artificial value to pass the validations. The real value is determined at runtime.
+            op_kwargs={**op_kwargs, 'snowpark_session': None},
+            **kwargs,
+        )
+
+    def execute_callable(self):
+        hook = get_db_hook(self)
+        snowpark_session = hook.get_snowpark_session()
+        return self.python_callable(*self.op_args, **{**self.op_kwargs, 'snowpark_session': snowpark_session})

Review Comment:
   ```suggestion
           return self.python_callable(*self.op_args, **self.op_kwargs, snowpark_session snowpark_session)
   ```



##########
airflow/providers/snowflake/decorators/snowpark.py:
##########
@@ -0,0 +1,143 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, Callable, Dict, Optional, Sequence
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import PythonOperator
+from airflow.providers.snowflake.operators.snowflake import get_db_hook
+
+if TYPE_CHECKING:
+    import snowflake
+
+    from airflow.decorators.base import TaskDecorator
+
+try:
+    import snowflake.snowpark  # noqa
+except ImportError as e:
+    # Snowpark is an optional feature and if imports are missing, it should be silently ignored
+    # As of Airflow 2.3  and above the operator can throw OptionalProviderFeatureException
+    try:
+        from airflow.exceptions import AirflowOptionalProviderFeatureException
+    except ImportError:
+        # However, in order to keep backwards-compatibility with Airflow 2.1 and 2.2, if the
+        # 2.3 exception cannot be imported, the original ImportError should be raised.
+        # This try/except can be removed when the provider depends on Airflow >= 2.3.0
+        raise e from None
+    raise AirflowOptionalProviderFeatureException(e)
+
+
+class _SnowparkDecoratedOperator(DecoratedOperator, PythonOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param snowflake_conn_id: Reference to
+        :ref:`Snowflake connection id<howto/connection:snowflake>`
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :param warehouse: name of warehouse (will overwrite any warehouse defined in the connection's extra JSON)
+    :param database: name of database (will overwrite database defined in connection)
+    :param schema: name of schema (will overwrite schema defined in connection)
+    :param role: name of role (will overwrite any role defined in connection's extra JSON)
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :param session_parameters: You can set session-level parameters at the time you connect to Snowflake
+    :param python_callable: A reference to an object that is callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked in your function (templated)
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    """
+
+    template_fields: Sequence[str] = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs: Sequence[str] = ('python_callable',)
+
+    def __init__(
+        self,
+        *,
+        snowflake_conn_id: str = 'snowflake_default',
+        parameters: Optional[dict] = None,
+        warehouse: Optional[str] = None,
+        database: Optional[str] = None,
+        role: Optional[str] = None,
+        schema: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        python_callable,
+        op_args,
+        op_kwargs: Dict,
+        **kwargs,
+    ) -> None:
+        self.snowflake_conn_id = snowflake_conn_id
+        self.parameters = parameters
+        self.warehouse = warehouse
+        self.database = database
+        self.role = role
+        self.schema = schema
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+        kwargs_to_upstream = {
+            "python_callable": python_callable,
+            "op_args": op_args,
+            "op_kwargs": op_kwargs,
+        }
+        super().__init__(
+            kwargs_to_upstream=kwargs_to_upstream,
+            python_callable=python_callable,
+            op_args=op_args,
+            # airflow.decorators.base.DecoratedOperator checks if the functions are bindable, so we have to
+            # add an artificial value to pass the validations. The real value is determined at runtime.
+            op_kwargs={**op_kwargs, 'snowpark_session': None},
+            **kwargs,
+        )
+
+    def execute_callable(self):
+        hook = get_db_hook(self)
+        snowpark_session = hook.get_snowpark_session()
+        return self.python_callable(*self.op_args, **{**self.op_kwargs, 'snowpark_session': snowpark_session})

Review Comment:
   ```suggestion
           return self.python_callable(*self.op_args, **self.op_kwargs, snowpark_session=snowpark_session)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sfc-gh-madkins commented on a diff in pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
sfc-gh-madkins commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1067497268


##########
Dockerfile:
##########
@@ -1234,8 +1234,10 @@ ARG ADDITIONAL_PYTHON_DEPS=""
 # https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates
 # * authlib, gcloud_aio_auth, adal are needed to generate constraints for PyPI packages and can be removed after we release
 #   new google, azure providers
+# * cloudpickle==2.0.0 is required by snowfalke-snowpark-python, which conflicts with new versions of apache beam, so
+#   we pin apache-beam also to the latest version which uses cloudpickle==2.0.0.
 # !!! MAKE SURE YOU SYNCHRONIZE THE LIST BETWEEN: Dockerfile, Dockerfile.ci, find_newer_dependencies.py
-ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="dill<0.3.3 pyarrow>=6.0.0 protobuf<4.21.0 authlib>=1.0.0 gcloud_aio_auth>=4.0.0 adal>=1.2.7"
+ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="dill<0.3.3 pyarrow>=6.0.0 protobuf<4.21.0 authlib>=1.0.0 gcloud_aio_auth>=4.0.0 adal>=1.2.7 cloudpickle==2.0.0 apache-beam==2.39.0"

Review Comment:
   @mik-laj where does teh dependency for apache beam come from?
   
   Here is wha I see the snowpark dependency list is?
   
   asn1crypto==1.5.1
   certifi==2022.12.7
   cffi==1.15.1
   charset-normalizer==2.1.1
   cloudpickle==2.0.0
   cryptography==38.0.4
   filelock==3.9.0
   idna==3.4
   oscrypto==1.3.0
   pycparser==2.21
   pycryptodomex==3.16.0
   PyJWT==2.6.0
   pyOpenSSL==22.1.0
   pytz==2022.7
   requests==2.28.1
   snowflake-connector-python==2.9.0
   snowflake-snowpark-python==1.0.0
   typing_extensions==4.4.0
   urllib3==1.26.14
   
   @mpgreg cc



##########
Dockerfile:
##########
@@ -1234,8 +1234,10 @@ ARG ADDITIONAL_PYTHON_DEPS=""
 # https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates
 # * authlib, gcloud_aio_auth, adal are needed to generate constraints for PyPI packages and can be removed after we release
 #   new google, azure providers
+# * cloudpickle==2.0.0 is required by snowfalke-snowpark-python, which conflicts with new versions of apache beam, so
+#   we pin apache-beam also to the latest version which uses cloudpickle==2.0.0.
 # !!! MAKE SURE YOU SYNCHRONIZE THE LIST BETWEEN: Dockerfile, Dockerfile.ci, find_newer_dependencies.py
-ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="dill<0.3.3 pyarrow>=6.0.0 protobuf<4.21.0 authlib>=1.0.0 gcloud_aio_auth>=4.0.0 adal>=1.2.7"
+ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="dill<0.3.3 pyarrow>=6.0.0 protobuf<4.21.0 authlib>=1.0.0 gcloud_aio_auth>=4.0.0 adal>=1.2.7 cloudpickle==2.0.0 apache-beam==2.39.0"

Review Comment:
   @mik-laj where does the dependency for apache beam come from?
   
   Here is wha I see the snowpark dependency list is?
   
   asn1crypto==1.5.1
   certifi==2022.12.7
   cffi==1.15.1
   charset-normalizer==2.1.1
   cloudpickle==2.0.0
   cryptography==38.0.4
   filelock==3.9.0
   idna==3.4
   oscrypto==1.3.0
   pycparser==2.21
   pycryptodomex==3.16.0
   PyJWT==2.6.0
   pyOpenSSL==22.1.0
   pytz==2022.7
   requests==2.28.1
   snowflake-connector-python==2.9.0
   snowflake-snowpark-python==1.0.0
   typing_extensions==4.4.0
   urllib3==1.26.14
   
   @mpgreg cc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sfc-gh-madkins commented on pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
sfc-gh-madkins commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1355093571

   > @sfc-gh-madkins any thoughts from eng on why the older version dep for Apache Beam and if/when they would bump that up?
   
   I dont see any dependencies on apache beam in the snowpark repo directly, only pyarrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mik-laj commented on pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1313111826

   @uranusjr @potiuk  It is ready for review. I solved all the problem with installing the snowflake-snowpark-python package in Docker
   
   @uranusjr  Now the snowflake-snowpark-python package is required for apache-airflow-providers-snowflake, but only installed for Python 3.8 to simplify its installations.
   
   @potiuk i updated `EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS` env var.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mik-laj commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r906792515


##########
tests/providers/snowflake/decorators/test_snowpark.py:
##########
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+from airflow.decorators import task
+from airflow.models.dag import DAG
+from airflow.utils import timezone
+
+try:
+    import snowflake.snowpark as snowpark
+    from snowflake.snowpark import Session
+except ImportError:
+    snowpark = None
+
+DEFAULT_DATE = timezone.datetime(2021, 9, 1)
+
+
+class TestDockerDecorator:
+    @unittest.skipIf(snowpark is None, 'snowpark package not present')

Review Comment:
   As far as I know, the decorators in the `unittest` package cannot be used for classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1004109798


##########
airflow/providers/snowflake/decorators/snowpark.py:
##########
@@ -0,0 +1,143 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, Callable, Dict, Optional, Sequence
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import PythonOperator
+from airflow.providers.snowflake.operators.snowflake import get_db_hook
+
+if TYPE_CHECKING:
+    import snowflake
+
+    from airflow.decorators.base import TaskDecorator
+
+try:
+    import snowflake.snowpark  # noqa
+except ImportError as e:
+    # Snowpark is an optional feature and if imports are missing, it should be silently ignored
+    # As of Airflow 2.3  and above the operator can throw OptionalProviderFeatureException

Review Comment:
   Nit
   
   ```suggestion
       # As of Airflow 2.3 and above the operator can throw AirflowOptionalProviderFeatureException.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mik-laj commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1021081382


##########
Dockerfile:
##########
@@ -1234,8 +1234,10 @@ ARG ADDITIONAL_PYTHON_DEPS=""
 # https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates
 # * authlib, gcloud_aio_auth, adal are needed to generate constraints for PyPI packages and can be removed after we release
 #   new google, azure providers
+# * cloudpickle==2.0.0 is required by snowfalke-snowpark-python, which conflicts with new versions of apache beam, so
+#   we pin pinning apache-beam also to the latest version which uses cloudpickle==2.0.0.

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1021562617


##########
docs/apache-airflow-providers-snowflake/decorators/snowpark.rst:
##########
@@ -0,0 +1,64 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/decorators:snowpark:
+
+``@task.snowpark``
+==============

Review Comment:
   ```suggestion
   ``@task.snowpark``
   ==================
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1022068572


##########
Dockerfile:
##########
@@ -1234,8 +1234,10 @@ ARG ADDITIONAL_PYTHON_DEPS=""
 # https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates
 # * authlib, gcloud_aio_auth, adal are needed to generate constraints for PyPI packages and can be removed after we release
 #   new google, azure providers
+# * cloudpickle==2.0.0 is required by snowfalke-snowpark-python, which conflicts with new versions of apache beam, so
+#   we pin apache-beam also to the latest version which uses cloudpickle==2.0.0.
 # !!! MAKE SURE YOU SYNCHRONIZE THE LIST BETWEEN: Dockerfile, Dockerfile.ci, find_newer_dependencies.py
-ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="dill<0.3.3 pyarrow>=6.0.0 protobuf<4.21.0 authlib>=1.0.0 gcloud_aio_auth>=4.0.0 adal>=1.2.7"
+ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="dill<0.3.3 pyarrow>=6.0.0 protobuf<4.21.0 authlib>=1.0.0 gcloud_aio_auth>=4.0.0 adal>=1.2.7 cloudpickle==2.0.0 apache-beam==2.39.0"

Review Comment:
   i think this is not good to `pin` the requirements to specific versions. if we really want to limit them, we should rather say <= or < here. 
   
   Why do we have "apache-beam==2.39.0" and "cloudpickle==2.0.0" - seems a bit odd choice and also it will heavily limit our capabilities. I believe the ARM image failing is caused by this one and also we are waiting for the new version of apache-beam to be able to release Python 3.11. I think limiting those **just** to have snowpark decorator is pretty wrong. Is there any reason snowpark cannot work with higher (also future) versions of those? 
   
   I think - to be perfectly honest, if that would be the consequence of accepting the snowpark, then possibly snowflake should release their own snowpark decorator as add-on to the existing provider? Limiting us to potentially fixed and vulnerable versions of dependencies is not something we should treat lightly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1277601674

   Hey @mik-laj , would love to get this one in? What are the pending items on this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1272662905

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mpgreg commented on pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
mpgreg commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1346202165

   @sfc-gh-madkins any thoughts from eng on why the older version dep for Apache Beam and if/when they would bump that up?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #24652: Add ``@task.snowpark`` decorator

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1445232475

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sfc-gh-madkins commented on pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
sfc-gh-madkins commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1340001495

   any updates here @mik-laj ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mik-laj commented on pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1287926611

   @kaxil The main challenge is that I got reports that SnowflakeOperator is not working in [the test package](https://github.com/apache/airflow/issues/24456#issuecomment-1225570524) so I am now working on running system tests to make sure this integration continues to work. https://github.com/apache/airflow/pull/27204


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mik-laj commented on pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1287928701

   I think it is not hard to fix a minimum scope, and then we will be sure that the integration works. This integration is problematic because it requires Python 3.8, so there's a non-zero possibility something will break over time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mik-laj commented on a diff in pull request #24652: Add @task.snowpark decorator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1021081277


##########
airflow/providers/snowflake/decorators/snowpark.py:
##########
@@ -0,0 +1,143 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, Callable, Dict, Optional, Sequence
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import PythonOperator
+from airflow.providers.snowflake.operators.snowflake import get_db_hook
+
+if TYPE_CHECKING:
+    import snowflake
+
+    from airflow.decorators.base import TaskDecorator
+
+try:
+    import snowflake.snowpark  # noqa
+except ImportError as e:
+    # Snowpark is an optional feature and if imports are missing, it should be silently ignored
+    # As of Airflow 2.3  and above the operator can throw OptionalProviderFeatureException

Review Comment:
   I made this package required but installed only for Python 3.8 because it would be difficult to integrate with our Docker image to have one extra package installed in a Docker image for Python 3.8 only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sfc-gh-madkins commented on a diff in pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
sfc-gh-madkins commented on code in PR #24652:
URL: https://github.com/apache/airflow/pull/24652#discussion_r1067527409


##########
Dockerfile:
##########
@@ -1234,8 +1234,10 @@ ARG ADDITIONAL_PYTHON_DEPS=""
 # https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates
 # * authlib, gcloud_aio_auth, adal are needed to generate constraints for PyPI packages and can be removed after we release
 #   new google, azure providers
+# * cloudpickle==2.0.0 is required by snowfalke-snowpark-python, which conflicts with new versions of apache beam, so
+#   we pin apache-beam also to the latest version which uses cloudpickle==2.0.0.
 # !!! MAKE SURE YOU SYNCHRONIZE THE LIST BETWEEN: Dockerfile, Dockerfile.ci, find_newer_dependencies.py
-ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="dill<0.3.3 pyarrow>=6.0.0 protobuf<4.21.0 authlib>=1.0.0 gcloud_aio_auth>=4.0.0 adal>=1.2.7"
+ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="dill<0.3.3 pyarrow>=6.0.0 protobuf<4.21.0 authlib>=1.0.0 gcloud_aio_auth>=4.0.0 adal>=1.2.7 cloudpickle==2.0.0 apache-beam==2.39.0"

Review Comment:
   @mpgreg it looks like cloudpickle is also blocking here ... not apache-beam specifically



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1314393461

   I think we need to really think if we want to make it part of the snowflake provider or external (snwoflake's add-ons).
   
   We can probably make it into optional feature, but if that means that you cannot install snowflake provider together with beam provider using latest "apache-beam" - then we are doing it wrong - because effectively snowflake provider would limit beam from using latest version.
   
   Not sure how to approach it, but I have a feeling that having "snowpark" add-on released by snowflake separately is the best option.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on pull request #24652: Add ``@task.snowpark`` decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1314056911

   Building prod images is failing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org