You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/18 23:36:14 UTC

[GitHub] [airflow] natanweinberger opened a new pull request #15425: Fix CLI connections import

natanweinberger opened a new pull request #15425:
URL: https://github.com/apache/airflow/pull/15425


   This is a bug fix for the CLI command `connections_import`. I originally added this CLI functionality in #15177. The bug was reported by @mudravrik [here](https://github.com/apache/airflow/pull/15177#issuecomment-822056108).
   
   ## Root cause
   I called `load_connections_dict` with the connections filepath and thought it returned a list of dictionaries. During the loop in which I added and committed the connections to the DB, I called the dictionary method `.items()` on each connection. However, each connection was actually stored into a `Connection` model instance rather than a dictionary, so the call I made to `.items()` resulted in an AttributeError.
   
   ## Solution
   In taking a closer look at this issue, I realized the code block that contained the bug can be removed altogether. Having a list of `Connection` models (rather than dictionaries) eliminated the need to even examine the keys and values of the dictionaries, I had only done that to safely load the data into `Connection` models in the first place.
   
   To avoid future issues, I also changed how the tests work for providing sample data. Before, a dictionary of sample data was set as the return value of `load_connections_dict`, which I didn't notice returned a list of `Connections`. Now, that dictionary is set of the return value of a lower level function, `_parse_secrets_file`, which does no heavy lifting outside of parsing a JSON, YAML or env file.
   
   Bug originally introduced in #15177, which closed #9855.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r641513080



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -238,39 +238,24 @@ def connections_delete(args):
 
 @cli_utils.action_logging
 def connections_import(args):
-    """Imports connections from a given file"""
+    """Imports connections from a file"""
     if os.path.exists(args.file):
         _import_helper(args.file)
     else:
         raise SystemExit("Missing connections file.")
 
 
 def _import_helper(file_path):
-    """Helps import connections from a file"""
-    connections_dict = load_connections_dict(file_path)
+    """Load connections from a file and save them to the DB. On collision, skip."""
+    connections_dict = _parse_secret_file(file_path)
     with create_session() as session:
-        for conn_id, conn_values in connections_dict.items():
+        for conn_id, conn_dict in connections_dict.items():
             if session.query(Connection).filter(Connection.conn_id == conn_id).first():
                 print(f'Could not import connection {conn_id}: connection already exists.')
                 continue
 
-            allowed_fields = [
-                'extra',
-                'description',
-                'conn_id',
-                'login',
-                'conn_type',
-                'host',
-                'password',
-                'schema',
-                'port',
-                'uri',
-                'extra_dejson',
-            ]
-            filtered_connection_values = {
-                key: value for key, value in conn_values.items() if key in allowed_fields
-            }
-            connection = _create_connection(conn_id, filtered_connection_values)
+            # Add the connection to the DB

Review comment:
       For compat we should add this
   
   ```suggestion
               if "extra_dejson" in conn_dict:
                   conn_dict["extra"] = conn_dict.pop("extra_dejson")
               # Add the connection to the DB
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-850458396


   > (by the way, taking a closer look at `connections export` function - it will never export a field `extra_dejson`, so it might be ok to let the import function assume it will never be present. It can only be present if someone manually edits the export to include it)
   
   Yeah, some people use connections import as a way to create it, so given that it _used_ to accept it if they manually created the file we should carry on accepting it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615929569



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       What do you think to add this method as a static method to the Connection class - `Connection.from_dict`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615999812



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Thanks for taking a look. I can add `Connection.from_dict`.
   
   For the implementation, should I avoid any dependencies on `secrets`? The existing function `load_connections_dict` calls a few helper functions. For example, it calls `_parse_secret_file`, which calls `_parse_[json,yaml,env]_file`.
   
   I can either:
   - move all these helper functions outside of `secrets/local_filesystem.py`
   - duplicate the same functionality in the cli module
   - add a method in the Connection class that still imports from secrets (but doesn't eliminate the dependency)
   
   What do you have in mind? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-833725210


   [The Workflow run](https://github.com/apache/airflow/actions/runs/817717030) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-834672218


   Hey @ashb, thanks for taking a look! I appreciate it.
   
   I want to point out that most changes here are based on the feedback from @mik-laj above in this PR. I can change or undo some of these things, but it'll be undoing some of the work that was based on that feedback, so I just want to make sure we're all on the same page.
   
   Moving the secrets parsing logic to a utils file: https://github.com/apache/airflow/pull/15425#discussion_r625638195
   I haven't changed much (if any) code in the parsing, just moved it to a new file (`utils/parse.py`) so that it's not coupled to the secrets backend. Effectively, copied and pasted. I can commit those suggested changes that improve efficiency or wording of it here though.
   
   Add a method `Connection.from_dict()`: https://github.com/apache/airflow/pull/15425#discussion_r615929569
   
   What do you think? Do you want me to revert anything?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger edited a comment on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger edited a comment on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-834672218


   Hey @ashb, thanks for taking a look! I appreciate it.
   
   I want to point out that some changes discussed above are based on feedback from @mik-laj in this PR. I can change or undo some of these things, but it'll be undoing some of the work that was based on that feedback, so I just want to make sure we're all on the same page.
   
   Moving the secrets parsing logic to a utils file: https://github.com/apache/airflow/pull/15425#discussion_r625638195
   I haven't changed much (if any) code in the parsing, just moved it to a new file (`utils/parse.py`) so that it's not coupled to the secrets backend. Effectively, cut and pasted. I can commit those suggested changes that improve efficiency or wording of it here though.
   
   Add a method `Connection.from_dict()`: https://github.com/apache/airflow/pull/15425#discussion_r615929569
   
   What do you think? Do you want me to revert anything?
   
   Update: Have implemented all other suggestions!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-850441089


   @ashb I've switched back to using `load_connections_dict`, which handles the logic for loading the connection values into the Connection model. So, the import function just loops through the dicts and and adds those to the session and commits them.
   
   This should be very similar now to the first iteration of this PR, just a bugfix for properly treating the `load_connections_dict` result as a `Connection` instance instead of a dict (and updating the tests).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r625638195



##########
File path: airflow/models/connection.py
##########
@@ -364,3 +364,14 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def from_dict(cls, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        return Connection(**conn_dict)

Review comment:
       We have a function that has a similar purpose but is more complex and supports more input parameters. See: airflow.secrets.local_filesystem._create_connection  Can you move it here? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-834850512


   [The Workflow run](https://github.com/apache/airflow/actions/runs/821823044) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r619284944



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       This change is now implemented.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615999812



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Thanks for taking a look. I can add `Connection.from_dict`.
   
   For the implementation, should I avoid any dependencies on `secrets`? The existing function `load_connections_dict` calls a few helper functions in order to deserialize data from a file. For example, it calls `_parse_secret_file`, which calls `_parse_[json,yaml,env]_file`.
   
   I can either:
   - move all these helper functions outside of `secrets/local_filesystem.py`
   - duplicate the same functionality in the cli module
   - add a method in the Connection class that still imports from secrets (but doesn't eliminate the dependency)
   
   What do you have in mind? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615999812



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Thanks for taking a look. I can add `Connection.from_dict`.
   
   For the implementation, in order to avoid any dependencies on `secrets`, I'll need to move several helper functions. The existing function `load_connections_dict` accepts a filepath and relies on `_parse_secret_file`, which calls `_parse_[json,yaml,env]_file` to read the file contents and deserialize them.
   
   I can either:
   - move all these helper functions outside of `secrets/local_filesystem.py` (if so, into a utils file perhaps)
   - duplicate the same functionality in the cli module (seems like a bad idea)
   - add a method in the Connection class that still imports from secrets (but doesn't eliminate the dependency)
   
   What do you have in mind? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615999812



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Thanks for taking a look. I can add `Connection.from_dict`.
   
   For the implementation, in order to avoid any dependencies on `secrets`, I'll need to move several helper functions. The existing function `load_connections_dict` accepts a filepath and relies on `_parse_secret_file`, which calls `_parse_[json,yaml,env]_file` to read the file contents and deserialize them.
   
   I can either:
   - move all these helper functions outside of `secrets/local_filesystem.py` (if so, into a utils file perhaps)
   - duplicate the same functionality in the cli module (seems like a bad idea)
   - create the method in the Connection class, but still import from `secrets` (doesn't eliminate the dependency)
   
   What do you have in mind? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r633819663



##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "

Review comment:
       Implemented 👍

##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "
+                "The connection must be defined as a dictionary."
+            )
+
+        connection_parameter_names = cls.get_connection_parameter_names() | {"extra_dejson"}
+        current_keys = set(conn_dict.keys())
+        if not current_keys.issubset(connection_parameter_names):
+            illegal_keys = current_keys - connection_parameter_names
+            illegal_keys_list = ", ".join(sorted(illegal_keys))
+            raise AirflowException(
+                f"The object have illegal keys: {illegal_keys_list}. "

Review comment:
       Implemented 👍




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger edited a comment on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger edited a comment on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-849688714


   @ashb Thanks for the feedback. As we discussed, simplified this PR a lot by reverting some of the changes and focusing on the bugfix.
   
   Here's a summary of the changes:
   
   - the `Connection` constructor handles non-strings in the `extra` kwarg
   - the connections import function uses the existing file parsing logic and passes the loaded dicts to `Connection.__init__()`
   - fixed a test to catch the issue that slipped through originally (root cause is summarized in the first comment in this PR)
   
   Much simpler!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r633846608



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue

Review comment:
       I've been poking around on this and running into an issue with the existing tests.
   
   The tests are using `unittest.mock.mock_open` to patch the call to `open`. This works when you want to mock reading data from a file using `read()` or `readlines()`, which is the existing implementation.
   
   However, `unittest.mock.mock_open` doesn't add compatibility for iterating over the lines of a file lazily until version 3.8. So, the `__iter__` method doesn't properly allow us to read the lines lazily in tests.
   
   See here: https://docs.python.org/3/library/unittest.mock.html#mock-open
   ```
   Changed in version 3.8: Added __iter__() to implementation so that iteration (such as in for loops) correctly consumes read_data.
   ```
   
   Given that this is a minor enhancement, I'd suggest punting on it for the time being. It will expand the scope of this PR and should be done separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r628451887



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message="Invalid line format. Key is empty.",
+                )
+            )
+        contents_dict[key].append(value)
+    return contents_dict, errors
+
+
+def _parse_yaml_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the YAML format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = yaml.safe_load(content)
+
+    except yaml.MarkedYAMLError as e:
+        return {}, [FileSyntaxError(line_no=e.problem_mark.line, message=str(e))]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+def _parse_json_file(file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parse a file in the JSON format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+FILE_PARSERS = {
+    "env": _parse_env_file,
+    "json": _parse_json_file,
+    "yaml": _parse_yaml_file,
+}
+
+
+def _parse_file(file_path: str) -> Dict[str, Any]:
+    """
+    Based on the file extension format, selects a parser, and parses the file.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Map of key (e.g. connection ID) and value.
+    """
+    if not os.path.exists(file_path):
+        raise AirflowException(f"File {file_path} was not found.")
+
+    log.debug("Parsing file: %s", file_path)
+
+    ext = file_path.rsplit(".", 2)[-1].lower()
+
+    if ext not in FILE_PARSERS:
+        raise AirflowException(
+            "Unsupported file format. The file must have the extension .env or .json or .yaml"
+        )
+
+    contents_dict, parse_errors = FILE_PARSERS[ext](file_path)
+
+    log.debug(
+        "Parsed file: len(parse_errors)=%d, len(contents_dict)=%d", len(parse_errors), len(contents_dict)
+    )
+
+    if parse_errors:
+        raise AirflowFileParseException(
+            "Failed to load the file.", file_path=file_path, parse_errors=parse_errors
+        )

Review comment:
       No problem for me to do this, but heads up that this is not new code, just cut from `secrets/local_filesystem.py`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger edited a comment on pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger edited a comment on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-822800538


   I've refactored the functionality that reads data from a file and creates connections from it.
   
   - the helper functions to parse the contents of JSON, YAML and env files are moved from `airflow/secrets/local_filesystem.py` to `airflow/utils/parse.py`, as suggested by @mik-laj above
   - there is a new method `Connection.from_dict`
   - the connections import CLI function uses the parse utils and `Connection.from_dict` to import connections from a file
   
   I was as conservative as possible in refactoring the secrets backend in this PR, but it seems like there's not much that uses it. I'd be open to doing some broader clean up there as a follow-up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615999812



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Thanks for taking a look. I can add `Connection.from_dict`.
   
   For the implementation, should I avoid any dependencies on `secrets`? The existing function `load_connections_dict` accepts a filepath and relies on helper functions to read it in and deserialize the data. For example, it calls `_parse_secret_file`, which calls `_parse_[json,yaml,env]_file`.
   
   I can either:
   - move all these helper functions outside of `secrets/local_filesystem.py`
   - duplicate the same functionality in the cli module
   - add a method in the Connection class that still imports from secrets (but doesn't eliminate the dependency)
   
   What do you have in mind? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r633846608



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue

Review comment:
       I've been poking around on this and running into an issue with the existing tests.
   
   The tests are using `unittest.mock.mock_open` to patch the call to `open` ([example](https://github.com/apache/airflow/blob/a73ba4b3af65ff392a05fe09f656a91b51401c4d/tests/secrets/test_local_filesystem.py#L36)). This works when you want to mock reading data from a file using `read()` or `readlines()`, which is the existing implementation.
   
   However, `unittest.mock.mock_open` doesn't add compatibility for iterating over the lines of a file lazily until version 3.8. So, the `__iter__` method doesn't properly allow us to read the lines lazily in tests.
   
   See here: https://docs.python.org/3/library/unittest.mock.html#mock-open
   ```
   Changed in version 3.8: Added __iter__() to implementation so that iteration (such as in for loops) correctly consumes read_data.
   ```
   
   Given that this is a minor enhancement, I'd suggest punting on it for the time being. It will expand the scope of this PR and should be done separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r616074748



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       You can do it in this PR. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-850459876


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615929569



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       What do you think to add this method as a static method to the Connection class - `Connection.from_dict`? I don't think CLI should depend on a backend secret. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r628451887



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message="Invalid line format. Key is empty.",
+                )
+            )
+        contents_dict[key].append(value)
+    return contents_dict, errors
+
+
+def _parse_yaml_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the YAML format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = yaml.safe_load(content)
+
+    except yaml.MarkedYAMLError as e:
+        return {}, [FileSyntaxError(line_no=e.problem_mark.line, message=str(e))]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+def _parse_json_file(file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parse a file in the JSON format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+FILE_PARSERS = {
+    "env": _parse_env_file,
+    "json": _parse_json_file,
+    "yaml": _parse_yaml_file,
+}
+
+
+def _parse_file(file_path: str) -> Dict[str, Any]:
+    """
+    Based on the file extension format, selects a parser, and parses the file.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Map of key (e.g. connection ID) and value.
+    """
+    if not os.path.exists(file_path):
+        raise AirflowException(f"File {file_path} was not found.")
+
+    log.debug("Parsing file: %s", file_path)
+
+    ext = file_path.rsplit(".", 2)[-1].lower()
+
+    if ext not in FILE_PARSERS:
+        raise AirflowException(
+            "Unsupported file format. The file must have the extension .env or .json or .yaml"
+        )
+
+    contents_dict, parse_errors = FILE_PARSERS[ext](file_path)
+
+    log.debug(
+        "Parsed file: len(parse_errors)=%d, len(contents_dict)=%d", len(parse_errors), len(contents_dict)
+    )
+
+    if parse_errors:
+        raise AirflowFileParseException(
+            "Failed to load the file.", file_path=file_path, parse_errors=parse_errors
+        )

Review comment:
       No problem for me to do this, but heads up that this is not new code, just cut from `secrets/local_filesystem.py`. Not sure if there was a reason to do it this way when it was originally committed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r640682833



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -28,9 +28,9 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
 from airflow.utils import cli as cli_utils, yaml
 from airflow.utils.cli import suppress_logs_and_warning
+from airflow.utils.parse import parse_file

Review comment:
       Got rid of utils/parse.py and left the functionality where it was in `secrets/local_filesystem.py`.

##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -28,9 +28,9 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
 from airflow.utils import cli as cli_utils, yaml
 from airflow.utils.cli import suppress_logs_and_warning
+from airflow.utils.parse import parse_file

Review comment:
       Got rid of `utils/parse.py` and left the functionality where it was in `secrets/local_filesystem.py`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r629299429



##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "
+                "The connection must be defined as a dictionary."
+            )
+
+        connection_parameter_names = cls.get_connection_parameter_names() | {"extra_dejson"}
+        current_keys = set(conn_dict.keys())
+        if not current_keys.issubset(connection_parameter_names):
+            illegal_keys = current_keys - connection_parameter_names
+            illegal_keys_list = ", ".join(sorted(illegal_keys))
+            raise AirflowException(
+                f"The object have illegal keys: {illegal_keys_list}. "

Review comment:
       ```suggestion
                   f"The connection dict has illegal keys: {illegal_keys_list}. "
   ```

##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "
+                "The connection must be defined as a dictionary."
+            )
+
+        connection_parameter_names = cls.get_connection_parameter_names() | {"extra_dejson"}
+        current_keys = set(conn_dict.keys())
+        if not current_keys.issubset(connection_parameter_names):
+            illegal_keys = current_keys - connection_parameter_names
+            illegal_keys_list = ", ".join(sorted(illegal_keys))
+            raise AirflowException(

Review comment:
       ```suggestion
               raise TypeError(
   ```

##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "

Review comment:
       ```suggestion
               raise TypeError(
                   f"Unexpected conn_dict type: {type(conn_dict).__name__}. "
   ```

##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue

Review comment:
       > in case it had been originally committed this way intentionally
   
   Very unlikely to have been intentional.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-834789740


   [The Workflow run](https://github.com/apache/airflow/actions/runs/821600074) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r616005330



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Yes. We should avoid dependency on secrets. We already have a lot of recursive imports between various connection modules, secret and others, so creating new modules will cure those situations a bit.
   
   > move all these helper functions outside of secrets/local_filesystem.py
   
   I think it is the best solution. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r633468801



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue

Review comment:
       > in case it had been originally committed this way intentionally
   
   Very unlikely to have been intentional.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-822800538


   I've refactored the functionality that reads data from a file and creates connections from it.
   
   - the helper functions to parse JSON, YAML and env files are now located in `airflow/utils/parse.py`
   - there is a new method `Connection.from_dict`
   - the connections import CLI function uses the util functions and `Connection.from_dict` to import connections from a file
   
   I was as conservative as possible in refactoring the secrets backend in this PR, but it seems like there's not much that uses it. I'd be open to doing some broader clean up there as a follow-up PR.
   
   @mik-laj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r628457738



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message="Invalid line format. Key is empty.",
+                )
+            )
+        contents_dict[key].append(value)
+    return contents_dict, errors
+
+
+def _parse_yaml_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the YAML format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = yaml.safe_load(content)
+
+    except yaml.MarkedYAMLError as e:
+        return {}, [FileSyntaxError(line_no=e.problem_mark.line, message=str(e))]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+def _parse_json_file(file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parse a file in the JSON format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]

Review comment:
       ✅

##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message="Invalid line format. Key is empty.",
+                )
+            )
+        contents_dict[key].append(value)
+    return contents_dict, errors
+
+
+def _parse_yaml_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the YAML format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = yaml.safe_load(content)
+
+    except yaml.MarkedYAMLError as e:
+        return {}, [FileSyntaxError(line_no=e.problem_mark.line, message=str(e))]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+def _parse_json_file(file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parse a file in the JSON format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+FILE_PARSERS = {
+    "env": _parse_env_file,
+    "json": _parse_json_file,
+    "yaml": _parse_yaml_file,
+}
+
+
+def _parse_file(file_path: str) -> Dict[str, Any]:

Review comment:
       ✅




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger edited a comment on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger edited a comment on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-834672218


   Hey @ashb, thanks for taking a look! I appreciate it.
   
   I want to point out that some changes discussed above are based on feedback from @mik-laj in this PR. I can change or undo some of these things, but it'll be undoing some of the work that was based on that feedback, so I just want to make sure we're all on the same page.
   
   Moving the secrets parsing logic to a utils file: https://github.com/apache/airflow/pull/15425#discussion_r625638195
   I haven't changed much (if any) code in the parsing, just moved it to a new file (`utils/parse.py`) so that it's not coupled to the secrets backend. Effectively, cut and pasted. I can commit those suggested changes that improve efficiency or wording of it here though.
   
   Add a method `Connection.from_dict()`: https://github.com/apache/airflow/pull/15425#discussion_r615929569
   
   What do you think? Do you want me to revert anything?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r628453598



##########
File path: airflow/secrets/local_filesystem.py
##########
@@ -263,19 +71,32 @@ def load_connections_dict(file_path: str) -> Dict[str, Any]:
     :return: A dictionary where the key contains a connection ID and the value contains the connection.
     :rtype: Dict[str, airflow.models.connection.Connection]
     """
+    from airflow.models.connection import Connection
+
     log.debug("Loading connection")
 
-    secrets: Dict[str, Any] = _parse_secret_file(file_path)
+    secrets: Dict[str, Any] = _parse_file(file_path)
     connection_by_conn_id = {}
     for key, secret_values in list(secrets.items()):
         if isinstance(secret_values, list):
-            if len(secret_values) > 1:
+            # secret_values is either length 0, 1 or 2+ -- only length 1 is valid
+            if not secret_values:
+                log.debug("No secret values for %s", key)
+                continue
+
+            if len(secret_values) >= 2:
                 raise ConnectionNotUnique(f"Found multiple values for {key} in {file_path}.")
 
-            for secret_value in secret_values:
-                connection_by_conn_id[key] = _create_connection(key, secret_value)
+            # secret_values must be of length one, so unpack it
+            elif secret_values:
+                secret_values = secret_values[0]
+
+        if isinstance(secret_values, dict):
+            connection_by_conn_id[key] = Connection.from_dict(key, secret_values)

Review comment:
       Creating a method `Connection.from_dict` was a suggestion from @mik-laj: https://github.com/apache/airflow/pull/15425#discussion_r615929569




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger edited a comment on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger edited a comment on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-850441089


   @ashb I've switched back to using `load_connections_dict`, which handles the logic for loading the connection values into the Connection model. So, the import function just loops through the dicts and and adds those to the session and commits them.
   
   This should be very similar now to the first iteration of this PR, just a bugfix for properly treating the `load_connections_dict` result as a `Connection` instance instead of a dict (and updating the tests).
   
   (by the way, taking a closer look at `connections export` function - it will never export a field `extra_dejson`, so it might be ok to let the import function assume it will never be present. It can only be present if someone manually edits the export to include it)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-826897331


   @mik-laj I've incorporated your feedback, can you re-review when you have a chance? 🙏


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615974735



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Ahhh. You don't want to change the visibility of this method, but you are using a different public method. We should therefore still create the `Connection.from_dict` method but as a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-843632088


   [The Workflow run](https://github.com/apache/airflow/actions/runs/854997422) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r628452701



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue

Review comment:
       Same as above. Can do this, but it's existing code from `local_filesystem.py`. Just a heads up in case it had been originally committed this way intentionally, I don't have much context.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r633903623



##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "
+                "The connection must be defined as a dictionary."
+            )
+
+        connection_parameter_names = cls.get_connection_parameter_names() | {"extra_dejson"}
+        current_keys = set(conn_dict.keys())
+        if not current_keys.issubset(connection_parameter_names):
+            illegal_keys = current_keys - connection_parameter_names
+            illegal_keys_list = ", ".join(sorted(illegal_keys))
+            raise AirflowException(

Review comment:
       Sounds good 👍




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger edited a comment on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger edited a comment on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-834672218


   Hey @ashb, thanks for taking a look! I appreciate it.
   
   I want to point out that most changes here are based on the feedback from @mik-laj above in this PR. I can change or undo some of these things, but it'll be undoing some of the work that was based on that feedback, so I just want to make sure we're all on the same page.
   
   Moving the secrets parsing logic to a utils file: https://github.com/apache/airflow/pull/15425#discussion_r625638195
   I haven't changed much (if any) code in the parsing, just moved it to a new file (`utils/parse.py`) so that it's not coupled to the secrets backend. Effectively, cut and pasted. I can commit those suggested changes that improve efficiency or wording of it here though.
   
   Add a method `Connection.from_dict()`: https://github.com/apache/airflow/pull/15425#discussion_r615929569
   
   What do you think? Do you want me to revert anything?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r616066366



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Sounds good, I'll move this logic elsewhere. Do you want me to do that as a separate PR then or in this one?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-849688714


   @ashb Thanks for the feedback. As we discussed, simplified this PR a lot by reverting some of the changes and focusing on the bugfix.
   
   Here's a summary of the changes:
   
   - the `Connection` constructor handles non-strings in the `extra` kwarg connections import functionality
   - the connections import function uses the existing file parsing logic and passes the loaded dicts to `Connection.__init__()`
   - fixed a test to catch the issue that slipped through originally (root cause is summarized in the first comment in this PR)
   
   Much simpler!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615999812



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Thanks for taking a look. I can add `Connection.from_dict`.
   
   For the implementation, should I avoid any dependencies on `secrets`? The existing function `load_connections_dict` accepts a filepath to a json, yaml or env file and relies on helper functions to read it in. For example, it calls `_parse_secret_file`, which calls `_parse_[json,yaml,env]_file`.
   
   I can either:
   - move all these helper functions outside of `secrets/local_filesystem.py`
   - duplicate the same functionality in the cli module
   - add a method in the Connection class that still imports from secrets (but doesn't eliminate the dependency)
   
   What do you have in mind? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r625985175



##########
File path: airflow/models/connection.py
##########
@@ -364,3 +364,14 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def from_dict(cls, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        return Connection(**conn_dict)

Review comment:
       All set!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r641511963



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -238,39 +238,24 @@ def connections_delete(args):
 
 @cli_utils.action_logging
 def connections_import(args):
-    """Imports connections from a given file"""
+    """Imports connections from a file"""
     if os.path.exists(args.file):
         _import_helper(args.file)
     else:
         raise SystemExit("Missing connections file.")
 
 
 def _import_helper(file_path):
-    """Helps import connections from a file"""
-    connections_dict = load_connections_dict(file_path)
+    """Load connections from a file and save them to the DB. On collision, skip."""
+    connections_dict = _parse_secret_file(file_path)
     with create_session() as session:
-        for conn_id, conn_values in connections_dict.items():
+        for conn_id, conn_dict in connections_dict.items():
             if session.query(Connection).filter(Connection.conn_id == conn_id).first():
                 print(f'Could not import connection {conn_id}: connection already exists.')
                 continue
 
-            allowed_fields = [
-                'extra',
-                'description',
-                'conn_id',
-                'login',
-                'conn_type',
-                'host',
-                'password',
-                'schema',
-                'port',
-                'uri',
-                'extra_dejson',
-            ]
-            filtered_connection_values = {
-                key: value for key, value in conn_values.items() if key in allowed_fields
-            }
-            connection = _create_connection(conn_id, filtered_connection_values)
+            # Add the connection to the DB
+            connection = Connection(conn_id, **dict(conn_dict.items()))

Review comment:
       ```suggestion
               connection = Connection(conn_id, **conn_dict)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r625666374



##########
File path: airflow/models/connection.py
##########
@@ -364,3 +364,14 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def from_dict(cls, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        return Connection(**conn_dict)

Review comment:
       Sure, will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r628457688



##########
File path: airflow/secrets/local_filesystem.py
##########
@@ -263,19 +71,32 @@ def load_connections_dict(file_path: str) -> Dict[str, Any]:
     :return: A dictionary where the key contains a connection ID and the value contains the connection.
     :rtype: Dict[str, airflow.models.connection.Connection]
     """
+    from airflow.models.connection import Connection
+
     log.debug("Loading connection")
 
-    secrets: Dict[str, Any] = _parse_secret_file(file_path)
+    secrets: Dict[str, Any] = _parse_file(file_path)
     connection_by_conn_id = {}
     for key, secret_values in list(secrets.items()):
         if isinstance(secret_values, list):
-            if len(secret_values) > 1:
+            # secret_values is either length 0, 1 or 2+ -- only length 1 is valid
+            if not secret_values:
+                log.debug("No secret values for %s", key)
+                continue
+
+            if len(secret_values) >= 2:
                 raise ConnectionNotUnique(f"Found multiple values for {key} in {file_path}.")
 
-            for secret_value in secret_values:
-                connection_by_conn_id[key] = _create_connection(key, secret_value)
+            # secret_values must be of length one, so unpack it
+            elif secret_values:
+                secret_values = secret_values[0]
+
+        if isinstance(secret_values, dict):
+            connection_by_conn_id[key] = Connection.from_dict(key, secret_values)
+        elif isinstance(secret_values, str):
+            connection_by_conn_id[key] = Connection(uri=secret_values)
         else:
-            connection_by_conn_id[key] = _create_connection(key, secret_values)
+            raise AirflowException(f"Unexpected value type: {type(secret_values)}.")

Review comment:
       ✅




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r633847669



##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "
+                "The connection must be defined as a dictionary."
+            )
+
+        connection_parameter_names = cls.get_connection_parameter_names() | {"extra_dejson"}
+        current_keys = set(conn_dict.keys())
+        if not current_keys.issubset(connection_parameter_names):
+            illegal_keys = current_keys - connection_parameter_names
+            illegal_keys_list = ", ".join(sorted(illegal_keys))
+            raise AirflowException(

Review comment:
       Oh, ValueError perhaps then?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r616066366



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Sounds good, I'll move this logic elsewhere. Do you still want that as a separate PR then or included in this one?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r640553314



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN

Review comment:
       ```suggestion
   from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
   from airflow.utils import yaml
   from airflow.utils.file import COMMENT_PATTERN
   ```

##########
File path: airflow/secrets/local_filesystem.py
##########
@@ -263,19 +71,32 @@ def load_connections_dict(file_path: str) -> Dict[str, Any]:
     :return: A dictionary where the key contains a connection ID and the value contains the connection.
     :rtype: Dict[str, airflow.models.connection.Connection]
     """
+    from airflow.models.connection import Connection
+
     log.debug("Loading connection")
 
-    secrets: Dict[str, Any] = _parse_secret_file(file_path)
+    secrets: Dict[str, Any] = _parse_file(file_path)
     connection_by_conn_id = {}
     for key, secret_values in list(secrets.items()):
         if isinstance(secret_values, list):
-            if len(secret_values) > 1:
+            # secret_values is either length 0, 1 or 2+ -- only length 1 is valid
+            if not secret_values:
+                log.debug("No secret values for %s", key)
+                continue
+
+            if len(secret_values) >= 2:
                 raise ConnectionNotUnique(f"Found multiple values for {key} in {file_path}.")
 
-            for secret_value in secret_values:
-                connection_by_conn_id[key] = _create_connection(key, secret_value)
+            # secret_values must be of length one, so unpack it
+            elif secret_values:
+                secret_values = secret_values[0]
+
+        if isinstance(secret_values, dict):
+            connection_by_conn_id[key] = Connection.from_dict(key, secret_values)

Review comment:
       Oh, this handles the extra to, which `**secret_values` wouldn't.

##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -28,9 +28,9 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
 from airflow.utils import cli as cli_utils, yaml
 from airflow.utils.cli import suppress_logs_and_warning
+from airflow.utils.parse import parse_file

Review comment:
       Hmmmm, I'm not really happy with this module name -- it doesn't _really_ tell me what its doing, not in specific enough terms.
   
   We've already got `airflow.utils.file` that we could include this in -- I think I prefer that over a new file.
   
   "parse_file" is also a bit ambigious, but I can't think of a better name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r641515257



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -238,39 +238,24 @@ def connections_delete(args):
 
 @cli_utils.action_logging
 def connections_import(args):
-    """Imports connections from a given file"""
+    """Imports connections from a file"""
     if os.path.exists(args.file):
         _import_helper(args.file)
     else:
         raise SystemExit("Missing connections file.")
 
 
 def _import_helper(file_path):
-    """Helps import connections from a file"""
-    connections_dict = load_connections_dict(file_path)
+    """Load connections from a file and save them to the DB. On collision, skip."""
+    connections_dict = _parse_secret_file(file_path)

Review comment:
       Wait, this shouldn't change, it should still call `load_connections_dict`, but we should change _that_ function to call.
   
   
   _wait_. Looking at load_connections_dict, I see it is _already_ returning a dict with values of Connection objects!
   
   Did this cli command actually work?!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r633818993



##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "
+                "The connection must be defined as a dictionary."
+            )
+
+        connection_parameter_names = cls.get_connection_parameter_names() | {"extra_dejson"}
+        current_keys = set(conn_dict.keys())
+        if not current_keys.issubset(connection_parameter_names):
+            illegal_keys = current_keys - connection_parameter_names
+            illegal_keys_list = ", ".join(sorted(illegal_keys))
+            raise AirflowException(

Review comment:
       Agreed with the same change on line 397, but this particular case shouldn't be a TypeError. It indicates that some keys in the dictionary aren't allowed. Maybe it can be a different type of exception though, what do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r633820621



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue

Review comment:
       Ok, will make this change then!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r633846608



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue

Review comment:
       I've been poking around on this and running into an issue with the existing tests.
   
   The tests are using `unittest.mock.mock_open` to patch the call to `open` ([example](https://github.com/apache/airflow/blob/master/tests/secrets/test_local_filesystem.py#L36)). This works when you want to mock reading data from a file using `read()` or `readlines()`, which is the existing implementation.
   
   However, `unittest.mock.mock_open` doesn't add compatibility for iterating over the lines of a file lazily until version 3.8. So, the `__iter__` method doesn't properly allow us to read the lines lazily in tests.
   
   See here: https://docs.python.org/3/library/unittest.mock.html#mock-open
   ```
   Changed in version 3.8: Added __iter__() to implementation so that iteration (such as in for loops) correctly consumes read_data.
   ```
   
   Given that this is a minor enhancement, I'd suggest punting on it for the time being. It will expand the scope of this PR and should be done separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r628381353



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message="Invalid line format. Key is empty.",
+                )
+            )
+        contents_dict[key].append(value)
+    return contents_dict, errors
+
+
+def _parse_yaml_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the YAML format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = yaml.safe_load(content)
+
+    except yaml.MarkedYAMLError as e:
+        return {}, [FileSyntaxError(line_no=e.problem_mark.line, message=str(e))]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+def _parse_json_file(file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parse a file in the JSON format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+FILE_PARSERS = {
+    "env": _parse_env_file,
+    "json": _parse_json_file,
+    "yaml": _parse_yaml_file,
+}
+
+
+def _parse_file(file_path: str) -> Dict[str, Any]:
+    """
+    Based on the file extension format, selects a parser, and parses the file.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Map of key (e.g. connection ID) and value.
+    """
+    if not os.path.exists(file_path):
+        raise AirflowException(f"File {file_path} was not found.")
+
+    log.debug("Parsing file: %s", file_path)
+
+    ext = file_path.rsplit(".", 2)[-1].lower()
+
+    if ext not in FILE_PARSERS:
+        raise AirflowException(
+            "Unsupported file format. The file must have the extension .env or .json or .yaml"
+        )
+
+    contents_dict, parse_errors = FILE_PARSERS[ext](file_path)
+
+    log.debug(
+        "Parsed file: len(parse_errors)=%d, len(contents_dict)=%d", len(parse_errors), len(contents_dict)
+    )
+
+    if parse_errors:
+        raise AirflowFileParseException(
+            "Failed to load the file.", file_path=file_path, parse_errors=parse_errors
+        )

Review comment:
       Just make the format specific parsers throw the errors directly?

##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue

Review comment:
       This loads the whole file in to memory, which is slightly wasteful. Instead we should do something like this:
   
   ```python
       with open(file_path) as f:
           for line_no, line in enumerate(f, 1):
               if not line.strip():
                  continue
   ```

##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message="Invalid line format. Key is empty.",
+                )
+            )
+        contents_dict[key].append(value)
+    return contents_dict, errors
+
+
+def _parse_yaml_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the YAML format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = yaml.safe_load(content)
+
+    except yaml.MarkedYAMLError as e:
+        return {}, [FileSyntaxError(line_no=e.problem_mark.line, message=str(e))]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+def _parse_json_file(file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parse a file in the JSON format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]

Review comment:
       ```suggestion
           return {}, [FileSyntaxError(line_no=1, message="The file should contain an object.")]
   ```

##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN
+
+log = logging.getLogger(__name__)
+
+
+def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the ``.env`` format.
+
+    .. code-block:: text
+
+        MY_CONN_ID=my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    contents_dict: Dict[str, List[str]] = defaultdict(list)
+    errors: List[FileSyntaxError] = []
+    for line_no, line in enumerate(content.splitlines(), 1):
+        if not line:
+            # Ignore empty line
+            continue
+
+        if COMMENT_PATTERN.match(line):
+            # Ignore comments
+            continue
+
+        var_parts: List[str] = line.split("=", 2)
+        if len(var_parts) != 2:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message='Invalid line format. The line should contain at least one equal sign ("=").',
+                )
+            )
+            continue
+
+        key, value = var_parts
+        if not key:
+            errors.append(
+                FileSyntaxError(
+                    line_no=line_no,
+                    message="Invalid line format. Key is empty.",
+                )
+            )
+        contents_dict[key].append(value)
+    return contents_dict, errors
+
+
+def _parse_yaml_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSyntaxError]]:
+    """
+    Parse a file in the YAML format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = yaml.safe_load(content)
+
+    except yaml.MarkedYAMLError as e:
+        return {}, [FileSyntaxError(line_no=e.problem_mark.line, message=str(e))]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+def _parse_json_file(file_path: str) -> Tuple[Dict[str, Any], List[FileSyntaxError]]:
+    """
+    Parse a file in the JSON format.
+
+    :param file_path: The location of the file that will be processed.
+    :type file_path: str
+    :return: Tuple with mapping of key and list of values and list of syntax errors
+    """
+    with open(file_path) as f:
+        content = f.read()
+
+    if not content:
+        return {}, [FileSyntaxError(line_no=1, message="The file is empty.")]
+    try:
+        contents_dict = json.loads(content)
+    except JSONDecodeError as e:
+        return {}, [FileSyntaxError(line_no=int(e.lineno), message=e.msg)]
+    if not isinstance(contents_dict, dict):
+        return {}, [FileSyntaxError(line_no=1, message="The file should contain the object.")]
+
+    return contents_dict, []
+
+
+FILE_PARSERS = {
+    "env": _parse_env_file,
+    "json": _parse_json_file,
+    "yaml": _parse_yaml_file,
+}
+
+
+def _parse_file(file_path: str) -> Dict[str, Any]:

Review comment:
       Since this is used outside of this module it should be "non-private"
   
   ```suggestion
   def parse_file(file_path: str) -> Dict[str, Any]:
   ```

##########
File path: airflow/secrets/local_filesystem.py
##########
@@ -263,19 +71,32 @@ def load_connections_dict(file_path: str) -> Dict[str, Any]:
     :return: A dictionary where the key contains a connection ID and the value contains the connection.
     :rtype: Dict[str, airflow.models.connection.Connection]
     """
+    from airflow.models.connection import Connection
+
     log.debug("Loading connection")
 
-    secrets: Dict[str, Any] = _parse_secret_file(file_path)
+    secrets: Dict[str, Any] = _parse_file(file_path)
     connection_by_conn_id = {}
     for key, secret_values in list(secrets.items()):
         if isinstance(secret_values, list):
-            if len(secret_values) > 1:
+            # secret_values is either length 0, 1 or 2+ -- only length 1 is valid
+            if not secret_values:
+                log.debug("No secret values for %s", key)
+                continue
+
+            if len(secret_values) >= 2:
                 raise ConnectionNotUnique(f"Found multiple values for {key} in {file_path}.")
 
-            for secret_value in secret_values:
-                connection_by_conn_id[key] = _create_connection(key, secret_value)
+            # secret_values must be of length one, so unpack it
+            elif secret_values:
+                secret_values = secret_values[0]
+
+        if isinstance(secret_values, dict):
+            connection_by_conn_id[key] = Connection.from_dict(key, secret_values)
+        elif isinstance(secret_values, str):
+            connection_by_conn_id[key] = Connection(uri=secret_values)
         else:
-            connection_by_conn_id[key] = _create_connection(key, secret_values)
+            raise AirflowException(f"Unexpected value type: {type(secret_values)}.")

Review comment:
       ```suggestion
               raise AirflowException(f"Unexpected value type: {type(secret_values).__name__}.")
   ```

##########
File path: airflow/secrets/local_filesystem.py
##########
@@ -263,19 +71,32 @@ def load_connections_dict(file_path: str) -> Dict[str, Any]:
     :return: A dictionary where the key contains a connection ID and the value contains the connection.
     :rtype: Dict[str, airflow.models.connection.Connection]
     """
+    from airflow.models.connection import Connection
+
     log.debug("Loading connection")
 
-    secrets: Dict[str, Any] = _parse_secret_file(file_path)
+    secrets: Dict[str, Any] = _parse_file(file_path)
     connection_by_conn_id = {}
     for key, secret_values in list(secrets.items()):
         if isinstance(secret_values, list):
-            if len(secret_values) > 1:
+            # secret_values is either length 0, 1 or 2+ -- only length 1 is valid
+            if not secret_values:
+                log.debug("No secret values for %s", key)
+                continue
+
+            if len(secret_values) >= 2:
                 raise ConnectionNotUnique(f"Found multiple values for {key} in {file_path}.")
 
-            for secret_value in secret_values:
-                connection_by_conn_id[key] = _create_connection(key, secret_value)
+            # secret_values must be of length one, so unpack it
+            elif secret_values:
+                secret_values = secret_values[0]
+
+        if isinstance(secret_values, dict):
+            connection_by_conn_id[key] = Connection.from_dict(key, secret_values)

Review comment:
       Do we need a method for this? Isn't it the same as
   
   ```suggestion
               connection_by_conn_id[key] = Connection(key, **secret_values)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-834850802


   [The Workflow run](https://github.com/apache/airflow/actions/runs/821847833) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r640682240



##########
File path: airflow/utils/parse.py
##########
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Parse data from a file if it uses a valid format."""
+import json
+import logging
+import os
+from collections import defaultdict
+from json import JSONDecodeError
+from typing import Any, Dict, List, Tuple
+
+import airflow.utils.yaml as yaml
+from airflow.exceptions import AirflowException, AirflowFileParseException, FileSyntaxError
+from airflow.utils.file import COMMENT_PATTERN

Review comment:
       Got rid of utils/parse.py.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger edited a comment on pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger edited a comment on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-826897331


   @mik-laj I've incorporated your feedback, can you please re-review when you have a chance? 🙏


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r629299429



##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "
+                "The connection must be defined as a dictionary."
+            )
+
+        connection_parameter_names = cls.get_connection_parameter_names() | {"extra_dejson"}
+        current_keys = set(conn_dict.keys())
+        if not current_keys.issubset(connection_parameter_names):
+            illegal_keys = current_keys - connection_parameter_names
+            illegal_keys_list = ", ".join(sorted(illegal_keys))
+            raise AirflowException(
+                f"The object have illegal keys: {illegal_keys_list}. "

Review comment:
       ```suggestion
                   f"The connection dict has illegal keys: {illegal_keys_list}. "
   ```

##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "
+                "The connection must be defined as a dictionary."
+            )
+
+        connection_parameter_names = cls.get_connection_parameter_names() | {"extra_dejson"}
+        current_keys = set(conn_dict.keys())
+        if not current_keys.issubset(connection_parameter_names):
+            illegal_keys = current_keys - connection_parameter_names
+            illegal_keys_list = ", ".join(sorted(illegal_keys))
+            raise AirflowException(

Review comment:
       ```suggestion
               raise TypeError(
   ```

##########
File path: airflow/models/connection.py
##########
@@ -377,3 +378,50 @@ def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
             if conn:
                 return conn
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
+
+    @classmethod
+    def get_connection_parameter_names(cls) -> Set[str]:
+        """Returns :class:`airflow.models.connection.Connection` constructor parameters."""
+        return {k for k in signature(cls.__init__).parameters.keys() if k != "self"}
+
+    @classmethod
+    def from_dict(cls, conn_id: str, conn_dict: Dict) -> 'Connection':
+        """
+        Create a connection from a dictionary.
+
+        :param conn_dict: dictionary representing a connection's attributes
+            e.g., {'conn_id': '', 'conn_type': '', 'login': '', ...}
+        :return: connection
+        """
+        if not isinstance(conn_dict, dict):
+            raise AirflowException(
+                f"Unexpected conn_dict type: {type(conn_dict)}. "

Review comment:
       ```suggestion
               raise TypeError(
                   f"Unexpected conn_dict type: {type(conn_dict).__name__}. "
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #15425:
URL: https://github.com/apache/airflow/pull/15425


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615999812



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Thanks for taking a look. I can add `Connection.from_dict`.
   
   For the implementation, in order to avoid any dependencies on `secrets`, I'll need to move several helper functions. The existing function `load_connections_dict` accepts a filepath and relies on a few other functions to read in the contents and deserialize the data. For example, it calls `_parse_secret_file`, which calls `_parse_[json,yaml,env]_file`.
   
   I can either:
   - move all these helper functions outside of `secrets/local_filesystem.py` (if so, into a utils file perhaps)
   - duplicate the same functionality in the cli module (seems like a bad idea)
   - add a method in the Connection class that still imports from secrets (but doesn't eliminate the dependency)
   
   What do you have in mind? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger edited a comment on pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger edited a comment on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-822800538


   I've refactored the functionality that reads data from a file and creates connections from it.
   
   - the helper functions to parse the contents of JSON, YAML and env files are moved from `airflow/secrets/local_filesystem.py` to `airflow/utils/parse.py`, as suggested by @mik-laj above
   - there is a new method `Connection.from_dict`
   - the connections import CLI function uses the parse utils and `Connection.from_dict` to import connections from a file, which eliminates dependency on secrets
   
   I was as conservative as possible in refactoring the secrets backend in this PR, but it seems like there's not much that uses it. I'd be open to doing some broader clean up there as a follow-up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-849583630


   And another idea: we possibly don't even need a new `from_dict` method.
   
   Instead we could make the Connection constructor take an `extra_dejson` argument:
   
   ```python
       def __init__(  # pylint: disable=too-many-arguments
           self,
           ...
           extra: Optional[str] = None,
           uri: Optional[str] = None,
           extra_dejson: Optional[str] = None,
       ):
           super().__init__()
   
           ...
   
           if extra_dejson:
               self.extra = json.dumps(extra_dejson)
   ```
   
   Though the "de" part of this name is wrong/confusing.
   
   Another options would be to have `Connection()` accept a dict as extra and jsonify it:
   
   ```python
       def __init__(  # pylint: disable=too-many-arguments
           self,
           ...
           extra: Union[str,dict] = None,
           uri: Optional[str] = None,
   
       ):
           super().__init__()
   
           ...
   
           if not isinstance(extra, str):
               extra = json.dumps(extra)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r616066366



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Sounds good, I'll move this logic elsewhere. Thanks @mik-laj 😀




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import and migrate logic from secrets to Connection model

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r641562826



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -238,39 +238,24 @@ def connections_delete(args):
 
 @cli_utils.action_logging
 def connections_import(args):
-    """Imports connections from a given file"""
+    """Imports connections from a file"""
     if os.path.exists(args.file):
         _import_helper(args.file)
     else:
         raise SystemExit("Missing connections file.")
 
 
 def _import_helper(file_path):
-    """Helps import connections from a file"""
-    connections_dict = load_connections_dict(file_path)
+    """Load connections from a file and save them to the DB. On collision, skip."""
+    connections_dict = _parse_secret_file(file_path)

Review comment:
       Nope, I introduced a bug at the last minute. The values of the dict were indeed Connection objects, but I was treating them as dictionaries (and my test didn't catch it). I can change this back to `load_connections_dict`, but I thought this was more straightforward.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger commented on a change in pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger commented on a change in pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#discussion_r615999812



##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -29,7 +29,7 @@
 from airflow.exceptions import AirflowNotFoundException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.secrets.local_filesystem import _create_connection, load_connections_dict
+from airflow.secrets.local_filesystem import load_connections_dict

Review comment:
       Thanks for taking a look. I can add `Connection.from_dict`.
   
   For the implementation, should I avoid any dependencies on `secrets`? The existing function `load_connections_dict` accepts a filepath to a json, yaml or env file and relies on helper functions to read it in and deserialize the data. For example, it calls `_parse_secret_file`, which calls `_parse_[json,yaml,env]_file`.
   
   I can either:
   - move all these helper functions outside of `secrets/local_filesystem.py`
   - duplicate the same functionality in the cli module
   - add a method in the Connection class that still imports from secrets (but doesn't eliminate the dependency)
   
   What do you have in mind? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] natanweinberger edited a comment on pull request #15425: Fix CLI connections import

Posted by GitBox <gi...@apache.org>.
natanweinberger edited a comment on pull request #15425:
URL: https://github.com/apache/airflow/pull/15425#issuecomment-822800538


   I've refactored the functionality that reads data from a file and creates connections from it.
   
   - the helper functions to parse the contents of JSON, YAML and env files are moved from `airflow/secrets/local_filesystem.py` to `airflow/utils/parse.py`, as suggested by @mik-laj above
   - there is a new method `Connection.from_dict`
   - the connections import CLI function uses the parse utils and `Connection.from_dict` to import connections from a file
   
   I was as conservative as possible in refactoring the secrets backend in this PR, but it seems like there's not much that uses it. I'd be open to doing some broader clean up there as a follow-up PR.
   
   @mik-laj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org