You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/06/22 07:49:09 UTC
[airflow] branch main updated: Pattern parameter in S3ToSnowflakeOperator (#24571)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 66e84001df Pattern parameter in S3ToSnowflakeOperator (#24571)
66e84001df is described below
commit 66e84001df069c76ba8bfefe15956c4018844b92
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Wed Jun 22 11:49:02 2022 +0400
Pattern parameter in S3ToSnowflakeOperator (#24571)
---
.../snowflake/transfers/s3_to_snowflake.py | 9 +++++-
airflow/providers/snowflake/utils/__init__.py | 16 ++++++++++
airflow/providers/snowflake/utils/common.py | 37 ++++++++++++++++++++++
.../operators/s3_to_snowflake.rst | 2 ++
.../snowflake/transfers/test_s3_to_snowflake.py | 7 +++-
tests/providers/snowflake/utils/__init__.py | 16 ++++++++++
tests/providers/snowflake/utils/test_common.py | 34 ++++++++++++++++++++
.../providers/snowflake/example_snowflake.py | 1 +
8 files changed, 120 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/snowflake/transfers/s3_to_snowflake.py b/airflow/providers/snowflake/transfers/s3_to_snowflake.py
index 9b8eecb4dd..6bb32f3e56 100644
--- a/airflow/providers/snowflake/transfers/s3_to_snowflake.py
+++ b/airflow/providers/snowflake/transfers/s3_to_snowflake.py
@@ -21,6 +21,7 @@ from typing import Any, Optional, Sequence
from airflow.models import BaseOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.providers.snowflake.utils.common import enclose_param
class S3ToSnowflakeOperator(BaseOperator):
@@ -43,6 +44,9 @@ class S3ToSnowflakeOperator(BaseOperator):
defined in the connection's extra JSON)
:param database: reference to a specific database in Snowflake connection
:param columns_array: reference to a specific columns array in snowflake database
+ :param pattern: regular expression pattern string specifying the file names and/or paths to match.
+ Note: regular expression will be automatically enclose in single quotes
+ and all single quotes in expression will replace by two single quotes.
:param snowflake_conn_id: Reference to
:ref:`Snowflake connection id<howto/connection:snowflake>`
:param role: name of role (will overwrite any role defined in
@@ -71,6 +75,7 @@ class S3ToSnowflakeOperator(BaseOperator):
file_format: str,
schema: Optional[str] = None,
columns_array: Optional[list] = None,
+ pattern: Optional[str] = None,
warehouse: Optional[str] = None,
database: Optional[str] = None,
autocommit: bool = True,
@@ -90,6 +95,7 @@ class S3ToSnowflakeOperator(BaseOperator):
self.file_format = file_format
self.schema = schema
self.columns_array = columns_array
+ self.pattern = pattern
self.autocommit = autocommit
self.snowflake_conn_id = snowflake_conn_id
self.role = role
@@ -122,7 +128,8 @@ class S3ToSnowflakeOperator(BaseOperator):
files = ", ".join(f"'{key}'" for key in self.s3_keys)
sql_parts.append(f"files=({files})")
sql_parts.append(f"file_format={self.file_format}")
-
+ if self.pattern:
+ sql_parts.append(f"pattern={enclose_param(self.pattern)}")
copy_query = "\n".join(sql_parts)
self.log.info('Executing COPY command...')
diff --git a/airflow/providers/snowflake/utils/__init__.py b/airflow/providers/snowflake/utils/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/snowflake/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/snowflake/utils/common.py b/airflow/providers/snowflake/utils/common.py
new file mode 100644
index 0000000000..fc442ebc87
--- /dev/null
+++ b/airflow/providers/snowflake/utils/common.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+def enclose_param(param: str) -> str:
+ """
+ Replace all single quotes in parameter by two single quotes and enclose param in single quote.
+
+ .. seealso::
+ https://docs.snowflake.com/en/sql-reference/data-types-text.html#single-quoted-string-constants
+
+ Examples:
+ .. code-block:: python
+
+ enclose_param("without quotes") # Returns: 'without quotes'
+ enclose_param("'with quotes'") # Returns: '''with quotes'''
+ enclose_param("Today's sales projections") # Returns: 'Today''s sales projections'
+ enclose_param("sample/john's.csv") # Returns: 'sample/john''s.csv'
+ enclose_param(".*'awesome'.*[.]csv") # Returns: '.*''awesome''.*[.]csv'
+
+ :param param: parameter which required single quotes enclosure.
+ """
+ return f"""'{param.replace("'", "''")}'"""
diff --git a/docs/apache-airflow-providers-snowflake/operators/s3_to_snowflake.rst b/docs/apache-airflow-providers-snowflake/operators/s3_to_snowflake.rst
index a2b7bb762b..a655b85608 100644
--- a/docs/apache-airflow-providers-snowflake/operators/s3_to_snowflake.rst
+++ b/docs/apache-airflow-providers-snowflake/operators/s3_to_snowflake.rst
@@ -31,6 +31,8 @@ Similarly to the :class:`SnowflakeOperator <airflow.providers.snowflake.operator
the additional relevant parameters to establish connection with your Snowflake instance.
This operator will allow loading of one or more named files from a specific Snowflake stage (predefined S3 path). In order to do so
pass the relevant file names to the ``s3_keys`` parameter and the relevant Snowflake stage to the ``stage`` parameter.
+``pattern`` can be used to specify the file names and/or paths match patterns
+(see `docs <https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#loading-using-pattern-matching>`__).
``file_format`` can be used to either reference an already existing Snowflake file format or a custom string that defines
a file format (see `docs <https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html>`__).
diff --git a/tests/providers/snowflake/transfers/test_s3_to_snowflake.py b/tests/providers/snowflake/transfers/test_s3_to_snowflake.py
index 07defa10ec..413043cdaf 100644
--- a/tests/providers/snowflake/transfers/test_s3_to_snowflake.py
+++ b/tests/providers/snowflake/transfers/test_s3_to_snowflake.py
@@ -24,12 +24,13 @@ from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeO
class TestS3ToSnowflakeTransfer:
+ @pytest.mark.parametrize("pattern", [None, '.*[.]csv'])
@pytest.mark.parametrize("columns_array", [None, ['col1', 'col2', 'col3']])
@pytest.mark.parametrize("s3_keys", [None, ['1.csv', '2.csv']])
@pytest.mark.parametrize("prefix", [None, 'prefix'])
@pytest.mark.parametrize("schema", [None, 'schema'])
@mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
- def test_execute(self, mock_run, schema, prefix, s3_keys, columns_array):
+ def test_execute(self, mock_run, schema, prefix, s3_keys, columns_array, pattern):
table = 'table'
stage = 'stage'
file_format = 'file_format'
@@ -42,6 +43,7 @@ class TestS3ToSnowflakeTransfer:
file_format=file_format,
schema=schema,
columns_array=columns_array,
+ pattern=pattern,
task_id="task_id",
dag=None,
).execute(None)
@@ -62,5 +64,8 @@ class TestS3ToSnowflakeTransfer:
copy_query += f"\nfile_format={file_format}"
+ if pattern:
+ copy_query += f"\npattern='{pattern}'"
+
mock_run.assert_called_once()
assert mock_run.call_args[0][0] == copy_query
diff --git a/tests/providers/snowflake/utils/__init__.py b/tests/providers/snowflake/utils/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/snowflake/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/snowflake/utils/test_common.py b/tests/providers/snowflake/utils/test_common.py
new file mode 100644
index 0000000000..40f4d27556
--- /dev/null
+++ b/tests/providers/snowflake/utils/test_common.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from airflow.providers.snowflake.utils.common import enclose_param
+
+
+@pytest.mark.parametrize(
+ "param,expected",
+ [
+ ("without quotes", "'without quotes'"),
+ ("'with quotes'", "'''with quotes'''"),
+ ("Today's sales projections", "'Today''s sales projections'"),
+ ("sample/john's.csv", "'sample/john''s.csv'"),
+ (".*'awesome'.*[.]csv", "'.*''awesome''.*[.]csv'"),
+ ],
+)
+def test_parameter_enclosure(param, expected):
+ assert enclose_param(param) == expected
diff --git a/tests/system/providers/snowflake/example_snowflake.py b/tests/system/providers/snowflake/example_snowflake.py
index ae6dca0fe2..23bed306c5 100644
--- a/tests/system/providers/snowflake/example_snowflake.py
+++ b/tests/system/providers/snowflake/example_snowflake.py
@@ -103,6 +103,7 @@ with DAG(
schema=SNOWFLAKE_SCHEMA,
stage=SNOWFLAKE_STAGE,
file_format="(type = 'CSV',field_delimiter = ';')",
+ pattern=".*[.]csv",
)
# [END howto_operator_s3_to_snowflake]