You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/06/22 07:49:09 UTC

[airflow] branch main updated: Pattern parameter in S3ToSnowflakeOperator (#24571)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 66e84001df Pattern parameter in S3ToSnowflakeOperator (#24571)
66e84001df is described below

commit 66e84001df069c76ba8bfefe15956c4018844b92
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Wed Jun 22 11:49:02 2022 +0400

    Pattern parameter in S3ToSnowflakeOperator (#24571)
---
 .../snowflake/transfers/s3_to_snowflake.py         |  9 +++++-
 airflow/providers/snowflake/utils/__init__.py      | 16 ++++++++++
 airflow/providers/snowflake/utils/common.py        | 37 ++++++++++++++++++++++
 .../operators/s3_to_snowflake.rst                  |  2 ++
 .../snowflake/transfers/test_s3_to_snowflake.py    |  7 +++-
 tests/providers/snowflake/utils/__init__.py        | 16 ++++++++++
 tests/providers/snowflake/utils/test_common.py     | 34 ++++++++++++++++++++
 .../providers/snowflake/example_snowflake.py       |  1 +
 8 files changed, 120 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/snowflake/transfers/s3_to_snowflake.py b/airflow/providers/snowflake/transfers/s3_to_snowflake.py
index 9b8eecb4dd..6bb32f3e56 100644
--- a/airflow/providers/snowflake/transfers/s3_to_snowflake.py
+++ b/airflow/providers/snowflake/transfers/s3_to_snowflake.py
@@ -21,6 +21,7 @@ from typing import Any, Optional, Sequence
 
 from airflow.models import BaseOperator
 from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.providers.snowflake.utils.common import enclose_param
 
 
 class S3ToSnowflakeOperator(BaseOperator):
@@ -43,6 +44,9 @@ class S3ToSnowflakeOperator(BaseOperator):
         defined in the connection's extra JSON)
     :param database: reference to a specific database in Snowflake connection
     :param columns_array: reference to a specific columns array in snowflake database
+    :param pattern: regular expression pattern string specifying the file names and/or paths to match.
+        Note: regular expression will be automatically enclose in single quotes
+        and all single quotes in expression will replace by two single quotes.
     :param snowflake_conn_id: Reference to
         :ref:`Snowflake connection id<howto/connection:snowflake>`
     :param role: name of role (will overwrite any role defined in
@@ -71,6 +75,7 @@ class S3ToSnowflakeOperator(BaseOperator):
         file_format: str,
         schema: Optional[str] = None,
         columns_array: Optional[list] = None,
+        pattern: Optional[str] = None,
         warehouse: Optional[str] = None,
         database: Optional[str] = None,
         autocommit: bool = True,
@@ -90,6 +95,7 @@ class S3ToSnowflakeOperator(BaseOperator):
         self.file_format = file_format
         self.schema = schema
         self.columns_array = columns_array
+        self.pattern = pattern
         self.autocommit = autocommit
         self.snowflake_conn_id = snowflake_conn_id
         self.role = role
@@ -122,7 +128,8 @@ class S3ToSnowflakeOperator(BaseOperator):
             files = ", ".join(f"'{key}'" for key in self.s3_keys)
             sql_parts.append(f"files=({files})")
         sql_parts.append(f"file_format={self.file_format}")
-
+        if self.pattern:
+            sql_parts.append(f"pattern={enclose_param(self.pattern)}")
         copy_query = "\n".join(sql_parts)
 
         self.log.info('Executing COPY command...')
diff --git a/airflow/providers/snowflake/utils/__init__.py b/airflow/providers/snowflake/utils/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/snowflake/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/snowflake/utils/common.py b/airflow/providers/snowflake/utils/common.py
new file mode 100644
index 0000000000..fc442ebc87
--- /dev/null
+++ b/airflow/providers/snowflake/utils/common.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+def enclose_param(param: str) -> str:
+    """
+    Replace all single quotes in parameter by two single quotes and enclose param in single quote.
+
+    .. seealso::
+        https://docs.snowflake.com/en/sql-reference/data-types-text.html#single-quoted-string-constants
+
+    Examples:
+     .. code-block:: python
+
+        enclose_param("without quotes")  # Returns: 'without quotes'
+        enclose_param("'with quotes'")  # Returns: '''with quotes'''
+        enclose_param("Today's sales projections")  # Returns: 'Today''s sales projections'
+        enclose_param("sample/john's.csv")  # Returns: 'sample/john''s.csv'
+        enclose_param(".*'awesome'.*[.]csv")  # Returns: '.*''awesome''.*[.]csv'
+
+    :param param: parameter which required single quotes enclosure.
+    """
+    return f"""'{param.replace("'", "''")}'"""
diff --git a/docs/apache-airflow-providers-snowflake/operators/s3_to_snowflake.rst b/docs/apache-airflow-providers-snowflake/operators/s3_to_snowflake.rst
index a2b7bb762b..a655b85608 100644
--- a/docs/apache-airflow-providers-snowflake/operators/s3_to_snowflake.rst
+++ b/docs/apache-airflow-providers-snowflake/operators/s3_to_snowflake.rst
@@ -31,6 +31,8 @@ Similarly to the :class:`SnowflakeOperator <airflow.providers.snowflake.operator
 the additional relevant parameters to establish connection with your Snowflake instance.
 This operator will allow loading of one or more named files from a specific Snowflake stage (predefined S3 path). In order to do so
 pass the relevant file names to the ``s3_keys`` parameter and the relevant Snowflake stage to the ``stage`` parameter.
+``pattern`` can be used to specify the file names and/or paths match patterns
+(see `docs <https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#loading-using-pattern-matching>`__).
 ``file_format`` can be used to either reference an already existing Snowflake file format or a custom string that defines
 a file format (see `docs <https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html>`__).
 
diff --git a/tests/providers/snowflake/transfers/test_s3_to_snowflake.py b/tests/providers/snowflake/transfers/test_s3_to_snowflake.py
index 07defa10ec..413043cdaf 100644
--- a/tests/providers/snowflake/transfers/test_s3_to_snowflake.py
+++ b/tests/providers/snowflake/transfers/test_s3_to_snowflake.py
@@ -24,12 +24,13 @@ from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeO
 
 
 class TestS3ToSnowflakeTransfer:
+    @pytest.mark.parametrize("pattern", [None, '.*[.]csv'])
     @pytest.mark.parametrize("columns_array", [None, ['col1', 'col2', 'col3']])
     @pytest.mark.parametrize("s3_keys", [None, ['1.csv', '2.csv']])
     @pytest.mark.parametrize("prefix", [None, 'prefix'])
     @pytest.mark.parametrize("schema", [None, 'schema'])
     @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
-    def test_execute(self, mock_run, schema, prefix, s3_keys, columns_array):
+    def test_execute(self, mock_run, schema, prefix, s3_keys, columns_array, pattern):
         table = 'table'
         stage = 'stage'
         file_format = 'file_format'
@@ -42,6 +43,7 @@ class TestS3ToSnowflakeTransfer:
             file_format=file_format,
             schema=schema,
             columns_array=columns_array,
+            pattern=pattern,
             task_id="task_id",
             dag=None,
         ).execute(None)
@@ -62,5 +64,8 @@ class TestS3ToSnowflakeTransfer:
 
         copy_query += f"\nfile_format={file_format}"
 
+        if pattern:
+            copy_query += f"\npattern='{pattern}'"
+
         mock_run.assert_called_once()
         assert mock_run.call_args[0][0] == copy_query
diff --git a/tests/providers/snowflake/utils/__init__.py b/tests/providers/snowflake/utils/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/snowflake/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/snowflake/utils/test_common.py b/tests/providers/snowflake/utils/test_common.py
new file mode 100644
index 0000000000..40f4d27556
--- /dev/null
+++ b/tests/providers/snowflake/utils/test_common.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from airflow.providers.snowflake.utils.common import enclose_param
+
+
+@pytest.mark.parametrize(
+    "param,expected",
+    [
+        ("without quotes", "'without quotes'"),
+        ("'with quotes'", "'''with quotes'''"),
+        ("Today's sales projections", "'Today''s sales projections'"),
+        ("sample/john's.csv", "'sample/john''s.csv'"),
+        (".*'awesome'.*[.]csv", "'.*''awesome''.*[.]csv'"),
+    ],
+)
+def test_parameter_enclosure(param, expected):
+    assert enclose_param(param) == expected
diff --git a/tests/system/providers/snowflake/example_snowflake.py b/tests/system/providers/snowflake/example_snowflake.py
index ae6dca0fe2..23bed306c5 100644
--- a/tests/system/providers/snowflake/example_snowflake.py
+++ b/tests/system/providers/snowflake/example_snowflake.py
@@ -103,6 +103,7 @@ with DAG(
         schema=SNOWFLAKE_SCHEMA,
         stage=SNOWFLAKE_STAGE,
         file_format="(type = 'CSV',field_delimiter = ';')",
+        pattern=".*[.]csv",
     )
 
     # [END howto_operator_s3_to_snowflake]