You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/18 18:09:36 UTC

[GitHub] [airflow] mik-laj opened a new pull request #8436: Add Local Filesystem Secret Backend

mik-laj opened a new pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436
 
 
   This backend is especially useful in the following use cases:
   
   * during **development**: It ensures data synchronization between all terminal windows (same as databases),
     and at the same time the values are retained after database restart (same as environment variable)
   * for **Kubernetes**: It allows you to store secrets in [Kubernetes Secrets](https://kubernetes.io/docs/concepts/configuration/secret/)
     or you can synchronize values using the sidecar container and
     [a shared volume](https://kubernetes.io/docs/tasks/access-application-cluster/communicate-containers-same-pod-shared-volume/) between two containers
   * if you're tired of defining all connections using a URI.
   
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410740112
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
 
 Review comment:
   <img width="655" alt="Screenshot 2020-04-18 at 21 24 12" src="https://user-images.githubusercontent.com/12058428/79669319-ff93d500-81ba-11ea-8c71-95c15ad50f0f.png">
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410733255
 
 

 ##########
 File path: airflow/secrets/base_secrets.py
 ##########
 @@ -43,7 +43,7 @@ def build_path(path_prefix: str, secret_id: str, sep: str = "/") -> str:
         """
         return f"{path_prefix}{sep}{secret_id}"
 
-    def get_conn_uri(self, conn_id: str) -> Optional[str]:
+    def get_conn_uri(self, conn_id: str) -> Optional[Union[str, List[str]]]:
 
 Review comment:
   I will revert it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410739408
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
 
 Review comment:
   I would like to add support for env files and better error handling 
   
   Please look at this code:
   https://github.com/apache/airflow/blob/master/airflow/cli/commands/variable_command.py#L76-L79
   A pure exception is caught here. The error message is very inaccurate. This is not user friendly.
   
   After that, the code in Web UI and CLI is duplicated. Now it will be the third-place where the logic responsible for loading variables from the file is.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410734516
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
 
 Review comment:
   I want to refactor CLI and Web UI to use the same code. in the next PR. For Web UI, it would be useful for the user to paste the contents of the file instead of uploading the file.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410735097
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
 
 Review comment:
   I think we should pass the file path directly to `_parser_env_file` and `_parse_json_file` and let both the method handle opening of files. 
   
   How about using https://pypi.org/project/environs/ for reading `.env` file? 
   
   Also please rename `_parser_env_file` to `_parse_env_file`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410735536
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
+
+    log.debug("Parsing file: %s", file_path)
+    secrets, parse_errors = _parse_secret_file(file_content, file_path)
+    log.debug("Parsed file file: len(parse_errors)=%d, len(secrets)=%d", len(parse_errors), len(secrets))
+
+    if parse_errors:
+        raise AirflowFileParseException(
+            f"Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors
+        )
+    return secrets or {}
+
+
+def _create_connection(conn_id: str, value: Any):
+    """
+    Creates a connection based on a URL or object.
+    """
+    from airflow.models import Connection
 
 Review comment:
   If we are already importing the Connection class, why do we need `CONNECTION_PARAMETERS_NAMES`, we could get them from the class itself, we don't need hardcoding of params

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on issue #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#issuecomment-615927107
 
 
   Whenever this PR is merged, please create a PR to backport this to v1-10-test branch too, would be useful to get this in 1.10.11

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410734516
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
 
 Review comment:
   I want to refactor CLI and Web UI to use the same code. I want to refactor CLI and Web UI to use the same code. in the next PR. For Web UI, it would be useful for the user to paste the contents of the file instead of uploading the file.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410735097
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
 
 Review comment:
   I think we should pass the file path directly to `_parser_env_file` and `_parse_json_file` and let both the method handle opening of files. 
   
   How about using https://pypi.org/project/environs/ for reading `.env` file? Should reduce the code we have in the PR for parsing it, ignore comments etc, WDYT?
   
   Also please rename `_parser_env_file` to `_parse_env_file`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410736096
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
+
+    log.debug("Parsing file: %s", file_path)
+    secrets, parse_errors = _parse_secret_file(file_content, file_path)
+    log.debug("Parsed file file: len(parse_errors)=%d, len(secrets)=%d", len(parse_errors), len(secrets))
+
+    if parse_errors:
+        raise AirflowFileParseException(
+            f"Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors
+        )
+    return secrets or {}
+
+
+def _create_connection(conn_id: str, value: Any):
+    """
+    Creates a connection based on a URL or object.
+    """
+    from airflow.models import Connection
+
+    if isinstance(value, str):
+        return Connection(conn_id=conn_id, uri=value)
+    if isinstance(value, dict):
+        current_keys = set(value.keys())
+        if not current_keys.issubset(CONNECTION_PARAMETERS_NAMES):
+            illegal_keys = current_keys - CONNECTION_PARAMETERS_NAMES
+            illegal_keys_list = ", ".join(illegal_keys)
+            raise AirflowException(
+                f"The object have illegal keys: {illegal_keys_list}. "
+                f"The dictionary can only contain the following keys: {CONNECTION_PARAMETERS_NAMES}"
+            )
+
+        if "conn_id" in current_keys and conn_id != value["conn_id"]:
+            raise AirflowException(
+                f"Mismatch conn_id. "
+                f"The dictionary key has the value: {value['conn_id']}. "
+                f"The item has the value: {conn_id}."
+            )
+        value["conn_id"] = conn_id
+        return Connection(**value)
+    raise AirflowException(
+        f"Unexpected value type: {type(value)}. The connection can only be defined using a string or object."
+    )
+
+
+def load_variables(file_path: str) -> Dict[str, str]:
+    """
+    Load vaariable from text file.
+
+    Both ``JSON`` and ``.env`` files are supported.
+
+    :rtype: Dict[str, List[str]]
+    """
+    log.debug("Loading variables")
+
+    secrets = _read_secret_file(file_path)
+    invalid_keys = [key for key, values in secrets.items() if isinstance(values, list) and len(values) != 1]
+    if invalid_keys:
+        raise AirflowException(f'The "{file_path}" file contains multiple values for keys: {invalid_keys}')
+    variables = {key: values[0] if isinstance(values, list) else values for key, values in secrets.items()}
+    log.debug("Loaded %d variables: ", len(variables))
+    return variables
+
+
+def load_connections(file_path: str):
+    """
+    Load connection from text file.
+
+    Both ``JSON`` and ``.env`` files are supported.
+
+    :return: A dictionary where the key contains a connection ID and the value contains a list of connections.
+    :rtype: Dict[str, List[airflow.models.connection.Connection]]
+    """
+    log.debug("Loading connection")
+
+    secrets: Dict[str, Any] = _read_secret_file(file_path)
+    connections_by_conn_id = defaultdict(list)
+    for key, secret_values in list(secrets.items()):
+        if isinstance(secret_values, list):
+            for secret_value in secret_values:
+                connections_by_conn_id[key].append(_create_connection(key, secret_value))
+        else:
+            connections_by_conn_id[key].append(_create_connection(key, secret_values))
+    num_conn = sum(map(len, connections_by_conn_id.values()))
+    log.debug("Loaded %d connections", num_conn)
+
+    return connections_by_conn_id
+
+
+class LocalFilesystemBackend(BaseSecretsBackend, LoggingMixin):
+    """
+    Retrieves Connection objects and Variables from local files
+
+    Both ``JSON`` and ``.env`` files are supported.
+
+    :param variable_file_path: File location with variables data.
+    :type variable_file_path: str
+    :param connection_file_path: File location with connection data.
 
 Review comment:
   ```suggestion
       :param connections_file_path: File location with connection data.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410733798
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
 
 Review comment:
   Description is missing

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410734028
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
 
 Review comment:
   If the name is `_parser_env_file`, how about we take the file as an input?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410735663
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
+
+    log.debug("Parsing file: %s", file_path)
+    secrets, parse_errors = _parse_secret_file(file_content, file_path)
+    log.debug("Parsed file file: len(parse_errors)=%d, len(secrets)=%d", len(parse_errors), len(secrets))
+
+    if parse_errors:
+        raise AirflowFileParseException(
+            f"Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors
+        )
+    return secrets or {}
+
+
+def _create_connection(conn_id: str, value: Any):
+    """
+    Creates a connection based on a URL or object.
+    """
+    from airflow.models import Connection
+
+    if isinstance(value, str):
+        return Connection(conn_id=conn_id, uri=value)
+    if isinstance(value, dict):
+        current_keys = set(value.keys())
+        if not current_keys.issubset(CONNECTION_PARAMETERS_NAMES):
+            illegal_keys = current_keys - CONNECTION_PARAMETERS_NAMES
+            illegal_keys_list = ", ".join(illegal_keys)
+            raise AirflowException(
+                f"The object have illegal keys: {illegal_keys_list}. "
+                f"The dictionary can only contain the following keys: {CONNECTION_PARAMETERS_NAMES}"
+            )
+
+        if "conn_id" in current_keys and conn_id != value["conn_id"]:
+            raise AirflowException(
+                f"Mismatch conn_id. "
+                f"The dictionary key has the value: {value['conn_id']}. "
+                f"The item has the value: {conn_id}."
+            )
+        value["conn_id"] = conn_id
+        return Connection(**value)
+    raise AirflowException(
+        f"Unexpected value type: {type(value)}. The connection can only be defined using a string or object."
+    )
+
+
+def load_variables(file_path: str) -> Dict[str, str]:
+    """
+    Load vaariable from text file.
 
 Review comment:
   ```suggestion
       Load variables from a text file.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410741048
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
 
 Review comment:
   environs use the .dotenv library to parse the file.  Unfortunately, this library does not support duplicate keys. In our case, it is necessary to handle multiple connections with the same connection ID.
   https://github.com/theskumar/python-dotenv/blob/master/src/dotenv/main.py#L81

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410733413
 
 

 ##########
 File path: docs/howto/use-alternative-secrets-backend.rst
 ##########
 @@ -52,7 +52,105 @@ Set ``backend`` to the fully qualified class name of the backend you want to ena
 You can provide ``backend_kwargs`` with json and it will be passed as kwargs to the ``__init__`` method of
 your secrets backend.
 
-See :ref:`AWS SSM Parameter Store <ssm_parameter_store_secrets>` for an example configuration.
+.. _local_disk_secrets:
+
+Local Filesystem Secrets Backend
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+This backend is especially useful in the following use cases:
+
+* **development**: It ensures data synchronization between all terminal windows (same as databases),
+  and at the same time the values are retained after database restart (same as environment variable)
+* **Kubernetes**: It allows you to store secrets in `Kubernetes Secrets <https://kubernetes.io/docs/concepts/configuration/secret/>`__
+  or you can synchronize values using the sidecar container and
+  `a shared volume <https://kubernetes.io/docs/tasks/access-application-cluster/communicate-containers-same-pod-shared-volume/>`__
+* **JSON**: If you're tired of defining all connections using a URI - creating JSON object is easier than using URI.
+
+To use variable and connection from local file, specify :py:class:`~airflow.secrets.local_filesystem.LocalFilesystemBackend`
+as the ``backend`` in  ``[secrets]`` section of ``airflow.cfg``.
+
+Available parameters to ``backend_kwargs``:
+
+* ``variable_file_path``: File location with variables data.
+* ``connection_file_path``: File location with connection data.
+
+Here is a sample configuration:
+
+.. code-block:: ini
+
+    [secrets]
+    backend = airflow.secrets.local_filesystem.LocalFilesystemBackend
+    backend_kwargs = {"variable_file_path": "/files/var.json", "connection_file_path": "/files/conn.json"}
+
+Both ``JSON`` and ``.env`` files are supported. All parameters are optional. If the file path is not passed,
+the backend returns an empty collection.
+
+Storing and Retrieving Connections
+""""""""""""""""""""""""""""""""""
+
+If you have set ``connection_file_path`` as ``/files/my_conn.json``, then the backend will read the
+file ``/files/my_conn.json`` when it looks for connections.
+
+The file can be defined in ``JSON`` or ``env`` format.
+
+The JSON file must contain an object where the key contains the connection ID and the value contains
+the definitions of one or more connections. The connection can be defined as a URL (string) or object.
+For a guide about defining a connection as a URI, see:: :doc:`generating_connection_uri`.
+For a description of the connection object parameters see :class:`~airflow.models.connection.Connection`.
+The following is a sample JSON file.
+
+.. code-block:: json
+
+    {
+        "CONN_A": "mysq://host_a",
+        "CONN_B": [
+            "mysq://host_a",
+            "mysq://host_a"
+        ],
+        "CONN_C": {
+            "conn_type": "scheme",
+            "host": "host",
+            "schema": "lschema",
+            "login": "Login",
+            "password": "None",
+            "port": "1234"
+        }
+    }
+
+You can also define connections using a ``.env`` file. Then the key is the connection ID, and
+the value should describe the connection using the URI. If the connection ID is repeated, all values will
+be returned. The following is a sample file.
+
+  .. code-block:: text
+
+    mysql_conn_id=mysql//log:password@13.1.21.1:3306/mysqldbrd
+    google_custom_key=google-cloud-platform://?extra__google_cloud_platform__key_path=%2Fkeys%2Fkey.json
+
+Storing and Retrieving Variables
+""""""""""""""""""""""""""""""""
+
+If you have set ``variable_file_path`` as ``/files/my_var.json``, then the backend will read the
+file ``/files/my_var.json`` when it looks for variables.
+
+The file can be defined in ``JSON`` or ``env`` format.
+
+The JSON file must contain an object where the key contains the variable key and the value contains
+the variable value. The following is a sample JSON file.
+
+  .. code-block:: json
+
+    {
+        "VAR_A": "some_value",
+        "var_b": "differnet_value"
+    }
+
+You can also define variable using a ``.env`` file. Then the key is the variable key, and variable should
+describe the variable value. The following is a sample  file.
 
 Review comment:
   ```suggestion
   describe the variable value. The following is a sample file.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410733299
 
 

 ##########
 File path: docs/howto/use-alternative-secrets-backend.rst
 ##########
 @@ -52,7 +52,105 @@ Set ``backend`` to the fully qualified class name of the backend you want to ena
 You can provide ``backend_kwargs`` with json and it will be passed as kwargs to the ``__init__`` method of
 your secrets backend.
 
-See :ref:`AWS SSM Parameter Store <ssm_parameter_store_secrets>` for an example configuration.
+.. _local_disk_secrets:
+
+Local Filesystem Secrets Backend
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+This backend is especially useful in the following use cases:
+
+* **development**: It ensures data synchronization between all terminal windows (same as databases),
+  and at the same time the values are retained after database restart (same as environment variable)
+* **Kubernetes**: It allows you to store secrets in `Kubernetes Secrets <https://kubernetes.io/docs/concepts/configuration/secret/>`__
+  or you can synchronize values using the sidecar container and
+  `a shared volume <https://kubernetes.io/docs/tasks/access-application-cluster/communicate-containers-same-pod-shared-volume/>`__
+* **JSON**: If you're tired of defining all connections using a URI - creating JSON object is easier than using URI.
+
+To use variable and connection from local file, specify :py:class:`~airflow.secrets.local_filesystem.LocalFilesystemBackend`
+as the ``backend`` in  ``[secrets]`` section of ``airflow.cfg``.
+
+Available parameters to ``backend_kwargs``:
+
+* ``variable_file_path``: File location with variables data.
+* ``connection_file_path``: File location with connection data.
+
+Here is a sample configuration:
+
+.. code-block:: ini
+
+    [secrets]
+    backend = airflow.secrets.local_filesystem.LocalFilesystemBackend
+    backend_kwargs = {"variable_file_path": "/files/var.json", "connection_file_path": "/files/conn.json"}
+
+Both ``JSON`` and ``.env`` files are supported. All parameters are optional. If the file path is not passed,
+the backend returns an empty collection.
+
+Storing and Retrieving Connections
+""""""""""""""""""""""""""""""""""
+
+If you have set ``connection_file_path`` as ``/files/my_conn.json``, then the backend will read the
+file ``/files/my_conn.json`` when it looks for connections.
+
+The file can be defined in ``JSON`` or ``env`` format.
+
+The JSON file must contain an object where the key contains the connection ID and the value contains
+the definitions of one or more connections. The connection can be defined as a URL (string) or object.
+For a guide about defining a connection as a URI, see:: :doc:`generating_connection_uri`.
+For a description of the connection object parameters see :class:`~airflow.models.connection.Connection`.
+The following is a sample JSON file.
+
+.. code-block:: json
+
+    {
+        "CONN_A": "mysq://host_a",
+        "CONN_B": [
+            "mysq://host_a",
+            "mysq://host_a"
+        ],
+        "CONN_C": {
+            "conn_type": "scheme",
+            "host": "host",
+            "schema": "lschema",
+            "login": "Login",
+            "password": "None",
+            "port": "1234"
+        }
+    }
+
+You can also define connections using a ``.env`` file. Then the key is the connection ID, and
+the value should describe the connection using the URI. If the connection ID is repeated, all values will
+be returned. The following is a sample file.
+
+  .. code-block:: text
+
+    mysql_conn_id=mysql//log:password@13.1.21.1:3306/mysqldbrd
 
 Review comment:
   ```suggestion
       mysql_conn_id=mysql://log:password@13.1.21.1:3306/mysqldbrd
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410735705
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
+
+    log.debug("Parsing file: %s", file_path)
+    secrets, parse_errors = _parse_secret_file(file_content, file_path)
+    log.debug("Parsed file file: len(parse_errors)=%d, len(secrets)=%d", len(parse_errors), len(secrets))
+
+    if parse_errors:
+        raise AirflowFileParseException(
+            f"Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors
+        )
+    return secrets or {}
+
+
+def _create_connection(conn_id: str, value: Any):
+    """
+    Creates a connection based on a URL or object.
+    """
+    from airflow.models import Connection
+
+    if isinstance(value, str):
+        return Connection(conn_id=conn_id, uri=value)
+    if isinstance(value, dict):
+        current_keys = set(value.keys())
+        if not current_keys.issubset(CONNECTION_PARAMETERS_NAMES):
+            illegal_keys = current_keys - CONNECTION_PARAMETERS_NAMES
+            illegal_keys_list = ", ".join(illegal_keys)
+            raise AirflowException(
+                f"The object have illegal keys: {illegal_keys_list}. "
+                f"The dictionary can only contain the following keys: {CONNECTION_PARAMETERS_NAMES}"
+            )
+
+        if "conn_id" in current_keys and conn_id != value["conn_id"]:
+            raise AirflowException(
+                f"Mismatch conn_id. "
+                f"The dictionary key has the value: {value['conn_id']}. "
+                f"The item has the value: {conn_id}."
+            )
+        value["conn_id"] = conn_id
+        return Connection(**value)
+    raise AirflowException(
+        f"Unexpected value type: {type(value)}. The connection can only be defined using a string or object."
+    )
+
+
+def load_variables(file_path: str) -> Dict[str, str]:
+    """
+    Load vaariable from text file.
+
+    Both ``JSON`` and ``.env`` files are supported.
+
+    :rtype: Dict[str, List[str]]
+    """
+    log.debug("Loading variables")
 
 Review comment:
   ```suggestion
       log.debug("Loading variables from a text file")
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410736676
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
 
 Review comment:
   The Variables uploaded from the UI are saved to DB. Where will this code be reused?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410738042
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
 
 Review comment:
   I need to check if this library allows duplicate keys. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410736070
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
+
+    log.debug("Parsing file: %s", file_path)
+    secrets, parse_errors = _parse_secret_file(file_content, file_path)
+    log.debug("Parsed file file: len(parse_errors)=%d, len(secrets)=%d", len(parse_errors), len(secrets))
+
+    if parse_errors:
+        raise AirflowFileParseException(
+            f"Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors
+        )
+    return secrets or {}
+
+
+def _create_connection(conn_id: str, value: Any):
+    """
+    Creates a connection based on a URL or object.
+    """
+    from airflow.models import Connection
+
+    if isinstance(value, str):
+        return Connection(conn_id=conn_id, uri=value)
+    if isinstance(value, dict):
+        current_keys = set(value.keys())
+        if not current_keys.issubset(CONNECTION_PARAMETERS_NAMES):
+            illegal_keys = current_keys - CONNECTION_PARAMETERS_NAMES
+            illegal_keys_list = ", ".join(illegal_keys)
+            raise AirflowException(
+                f"The object have illegal keys: {illegal_keys_list}. "
+                f"The dictionary can only contain the following keys: {CONNECTION_PARAMETERS_NAMES}"
+            )
+
+        if "conn_id" in current_keys and conn_id != value["conn_id"]:
+            raise AirflowException(
+                f"Mismatch conn_id. "
+                f"The dictionary key has the value: {value['conn_id']}. "
+                f"The item has the value: {conn_id}."
+            )
+        value["conn_id"] = conn_id
+        return Connection(**value)
+    raise AirflowException(
+        f"Unexpected value type: {type(value)}. The connection can only be defined using a string or object."
+    )
+
+
+def load_variables(file_path: str) -> Dict[str, str]:
+    """
+    Load vaariable from text file.
+
+    Both ``JSON`` and ``.env`` files are supported.
+
+    :rtype: Dict[str, List[str]]
+    """
+    log.debug("Loading variables")
+
+    secrets = _read_secret_file(file_path)
+    invalid_keys = [key for key, values in secrets.items() if isinstance(values, list) and len(values) != 1]
+    if invalid_keys:
+        raise AirflowException(f'The "{file_path}" file contains multiple values for keys: {invalid_keys}')
+    variables = {key: values[0] if isinstance(values, list) else values for key, values in secrets.items()}
+    log.debug("Loaded %d variables: ", len(variables))
+    return variables
+
+
+def load_connections(file_path: str):
+    """
+    Load connection from text file.
+
+    Both ``JSON`` and ``.env`` files are supported.
+
+    :return: A dictionary where the key contains a connection ID and the value contains a list of connections.
+    :rtype: Dict[str, List[airflow.models.connection.Connection]]
+    """
+    log.debug("Loading connection")
+
+    secrets: Dict[str, Any] = _read_secret_file(file_path)
+    connections_by_conn_id = defaultdict(list)
+    for key, secret_values in list(secrets.items()):
+        if isinstance(secret_values, list):
+            for secret_value in secret_values:
+                connections_by_conn_id[key].append(_create_connection(key, secret_value))
+        else:
+            connections_by_conn_id[key].append(_create_connection(key, secret_values))
+    num_conn = sum(map(len, connections_by_conn_id.values()))
+    log.debug("Loaded %d connections", num_conn)
+
+    return connections_by_conn_id
+
+
+class LocalFilesystemBackend(BaseSecretsBackend, LoggingMixin):
+    """
+    Retrieves Connection objects and Variables from local files
+
+    Both ``JSON`` and ``.env`` files are supported.
+
+    :param variable_file_path: File location with variables data.
 
 Review comment:
   ```suggestion
       :param variables_file_path: File location with variables data.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410735097
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
 
 Review comment:
   I think we should pass the file path directly to `_parser_env_file` and `_parse_json_file` and let both the method handle opening of files. 
   
   How about using https://pypi.org/project/environs/ for reading `.env` file? Should reduce the code we have in the PR for parsing it, ignore comments etc
   
   Also please rename `_parser_env_file` to `_parse_env_file`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8436: Add Local Filesystem Secret Backend
URL: https://github.com/apache/airflow/pull/8436#discussion_r410737827
 
 

 ##########
 File path: airflow/secrets/local_filesystem.py
 ##########
 @@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Objects relating to retreiving connections and variables from local file
+"""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Optional, Tuple
+
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.secrets.base_secrets import BaseSecretsBackend
+from airflow.utils.file import COMMENT_PATTERN
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+# constants to limit cyclical imports
+CONNECTION_PARAMETERS_NAMES = {
+    "conn_id",
+    "conn_type",
+    "host",
+    "login",
+    "password",
+    "schema",
+    "port",
+    "extra",
+    "uri",
+}
+
+log = logging.getLogger(__name__)
+
+
+def _parser_env_file(content: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env '' format.
+
+   .. code-block:: text
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    secrets: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments:
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(FileSyntaxError(line_no=line_no, message="Invalid line format. Key is empty.",))
+        secrets[key].append(value)
+    return secrets, errors
+
+
+def _parse_json_file(content: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parrse a file in the JSON format.
+
+    :param content:
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        secrets = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(secrets, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return secrets, []
+
+
+def _parse_secret_file(file_content: str, file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Based on the file extension format, selects a parser and parse the file.
+
+    :param file_content:
+    :param file_path:
+    :return:
+    """
+    if file_path.lower().endswith(".env"):
+        secrets, parse_errors = _parser_env_file(file_content)
+    elif file_path.lower().endswith(".json"):
+        secrets, parse_errors = _parse_json_file(file_content)
+    else:
+        raise AirflowException("Unsupported file format. The file must have the extension .env or .json")
+    return secrets, parse_errors
+
+
+def _read_secret_file(file_path: str) -> Dict[str, str]:
+    if not os.path.exists(file_path):
+        raise AirflowException(
+            f"File {file_path} was not found. Check the configuration of your secret backend."
+        )
+
+    log.debug("Reading file: %s", file_path)
+    with open(file_path) as f:
+        file_content = f.read()
+
+    log.debug("Parsing file: %s", file_path)
+    secrets, parse_errors = _parse_secret_file(file_content, file_path)
+    log.debug("Parsed file file: len(parse_errors)=%d, len(secrets)=%d", len(parse_errors), len(secrets))
+
+    if parse_errors:
+        raise AirflowFileParseException(
+            f"Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors
+        )
+    return secrets or {}
+
+
+def _create_connection(conn_id: str, value: Any):
+    """
+    Creates a connection based on a URL or object.
+    """
+    from airflow.models import Connection
 
 Review comment:
   I had cyclical imports before. Maybe it somehow disappeared. I'll check it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services