You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/26 21:03:27 UTC

[GitHub] [airflow] Taragolis opened a new pull request, #26687: Added information about Amazon Elastic MapReduce Connection

Taragolis opened a new pull request, #26687:
URL: https://github.com/apache/airflow/pull/26687

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   Right now there is not information how to use and `Amazon Elastic MapReduce Connection`.
   Personally for me this connection a bit odd because it only contain parameters for single [boto3 method](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow).
   However this connection exists for a long time and might be use for some one, so I add:
   - Connection page
   - Additional information information how `emr_conn_id` and `job_flow_overrides` work together
   - Keep only Extra field in Connection UI
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r983563080


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":

Review Comment:
   Change from `"emr"` to `self. conn_type`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r981455917


##########
docs/apache-airflow-providers-amazon/connections/emr.rst:
##########
@@ -0,0 +1,43 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection:emr:
+
+Amazon Elastic MapReduce Connection

Review Comment:
   Added benefit, if someone is doing a search for "emr" this will now come up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #26687:
URL: https://github.com/apache/airflow/pull/26687#issuecomment-1259231874

   @ferruzzi very appreciate! Almost all of the changes related to documentation or inform users about something that we do previously silently. Like if `emr_conn_id` not exists that we just assume that we should use empty config without notify a user.
   
   Also today I overwrite `test_connection` method, just because right now it tried to use AwsBaseHook/AwsGenericHook method and test actually ability to connect to AWS itself by default credentials.
   
   ---
   ![image](https://user-images.githubusercontent.com/3998685/192486992-0d2dc114-b26f-43e7-8f47-699b67b18b8a.png)
   
   ---
   ![image](https://user-images.githubusercontent.com/3998685/192487771-a04be075-8e95-4252-b7f3-b09b726594f7.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #26687:
URL: https://github.com/apache/airflow/pull/26687#issuecomment-1260181376

   > Thanks for the contribution, great stuff as always! Just some minor changes requested 👍
   
   Cool. I will add this suggestion in the morning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #26687:
URL: https://github.com/apache/airflow/pull/26687


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r982116922


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":

Review Comment:
   First of all why we need use some magic: `conn_type` not a mandatory attribute for class `airflow.models.connection.Connection` and could be an empty string/None.
   
   Actually it not easy to achieve this, for connection in the UI I thought it almost not possible,
   URI base should miss schema something like this `://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@` or for JSON it might be
   ```json
   {
      "login": "AKIAIOSFODNN7EXAMPLE",
      "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
   }
   ```
   
   Connection object do not provide any abilities for actual validate conn_type this. So we could do
   1. `if emr_conn.conn_type and emr_conn.conn_type != "emr":`
   2. `if (emr_conn.conn_type or "emr") != "emr":`
   3. `if emr_conn.conn_type not in ("emr", None):`
   
   Another option. Right now in the PR I suggest that if user not provide any of conn_type, that user mean `emr` however we could do differently if it not set (or empty) than assume that might something went wrong: `if emr_conn.conn_type != "emr":`
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r981478005


##########
docs/apache-airflow-providers-amazon/connections/emr.rst:
##########
@@ -0,0 +1,43 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection:emr:
+
+Amazon Elastic MapReduce Connection

Review Comment:
   Good point. Actually on daily basis "Elastic MapReduce" use very rarely just when introduce this service to someone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on PR #26687:
URL: https://github.com/apache/airflow/pull/26687#issuecomment-1258806424

   Fighting with a system test at the moment, I'll check it out tomorrow (Tuesday) :+1: 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r983917923


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,93 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial
+        configuration is used.
+
+        :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration
+            cluster. The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "using an empty initial configuration. If you want to get rid of this warning "
+                    "message please provide a valid `emr_conn_id` or set it to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":
+                    warnings.warn(
+                        "Amazon Elastic MapReduce Connection expected connection type 'emr', "
+                        f"Connection {self.emr_conn_id!r} has conn_type={emr_conn.conn_type!r}. "
+                        f"This connection might not work correctly.",
+                        UserWarning,
+                        stacklevel=2,
+                    )
+                config = emr_conn.extra_dejson.copy()
         config.update(job_flow_overrides)
 
         response = self.get_conn().run_job_flow(**config)
 
         return response
 
+    def test_connection(self):
+        """
+        Return failed state for test Amazon Elastic MapReduce Connection (untestable)
+
+        We need to overwrite this method because this hook is based on
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
+        otherwise it will try to test connection to AWS STS by using the default boto3 credential strategy.
+        """
+        return False, f"{self.hook_name} Connection cannot be tested."

Review Comment:
   ![image](https://user-images.githubusercontent.com/3998685/193118197-f7849c4d-0c9a-4a77-bbd1-e267e147e8c9.png)
   
   Looks fine!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r981310250


##########
docs/apache-airflow-providers-amazon/connections/emr.rst:
##########
@@ -0,0 +1,43 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection:emr:
+
+Amazon Elastic MapReduce Connection

Review Comment:
   It might not be obvious for everyone that EMR stands for Amazon Elastic MapReduce
   
   ```suggestion
   Amazon Elastic MapReduce (EMR) Connection
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r982935903


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,93 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial
+        configuration is used.
+
+        :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration
+            cluster. The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "using an empty initial configuration. If you want to get rid of this warning "
+                    "message please provide a valid `emr_conn_id` or set it to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":
+                    warnings.warn(
+                        "Amazon Elastic MapReduce Connection expected connection type 'emr', "
+                        f"Connection {self.emr_conn_id!r} has conn_type={emr_conn.conn_type!r}. "
+                        f"This connection might not work correctly.",
+                        UserWarning,
+                        stacklevel=2,
+                    )
+                config = emr_conn.extra_dejson.copy()
         config.update(job_flow_overrides)
 
         response = self.get_conn().run_job_flow(**config)
 
         return response
 
+    def test_connection(self):
+        """
+        Return failed state for test Amazon Elastic MapReduce Connection (untestable)
+
+        We need to overwrite this method because this hook is based on
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
+        otherwise it will try to test connection to AWS STS by using the default boto3 credential strategy.
+        """
+        return False, f"{self.hook_name} Connection cannot be tested."

Review Comment:
   > why the connection cannot be tested in the response from the API/UI.
   
   Oh actually by design of this connection. It more close to the Airflow Variable rather than Airflow Connection.
   
   So we just store this key-value pairs for https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html
   
   So nothing to test actually. It is even hardly possible to validate which key are valid, since this information stored somewhere in `botocore` and depend on which version of AWS API it supported.
   
   The main problem in logic of providers manager which is responsible for a decision is Connection testable or not.
   
   https://github.com/apache/airflow/blob/c94f978a66a7cfc31b6d461bbcbfd0f2ddb2962e/airflow/providers_manager.py#L793-L800 
   
   In our case `EmrHook` based on `AwsGenericHook` which has `test_connection` method so it pass connection to `emr_conn_id` which has no affect in `AwsGenericHook.test_connection`.
   
   But I agree that message from the current PR might be not so clear for end-users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r985013751


##########
tests/providers/amazon/aws/hooks/test_emr.py:
##########
@@ -29,8 +30,8 @@
     mock_emr = None
 
 
-@unittest.skipIf(mock_emr is None, 'moto package not present')

Review Comment:
   Nice! Love the conversion to pytest here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r983565140


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,93 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial
+        configuration is used.
+
+        :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration
+            cluster. The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "using an empty initial configuration. If you want to get rid of this warning "
+                    "message please provide a valid `emr_conn_id` or set it to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":
+                    warnings.warn(
+                        "Amazon Elastic MapReduce Connection expected connection type 'emr', "
+                        f"Connection {self.emr_conn_id!r} has conn_type={emr_conn.conn_type!r}. "
+                        f"This connection might not work correctly.",
+                        UserWarning,
+                        stacklevel=2,
+                    )
+                config = emr_conn.extra_dejson.copy()
         config.update(job_flow_overrides)
 
         response = self.get_conn().run_job_flow(**config)
 
         return response
 
+    def test_connection(self):
+        """
+        Return failed state for test Amazon Elastic MapReduce Connection (untestable)
+
+        We need to overwrite this method because this hook is based on
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
+        otherwise it will try to test connection to AWS STS by using the default boto3 credential strategy.
+        """
+        return False, f"{self.hook_name} Connection cannot be tested."

Review Comment:
   I change message but still not sure is better `'Amazon Elastic MapReduce' Airflow Connection cannot be tested by design.` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r982927500


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":

Review Comment:
   > the "emr", is there not a Enum, Constant, etc somewhere than can be used instead of this string?
   
   Oh... got you. We have a better than Enum and Constraint -`EmrHook.conn_type` class attribute.
   
   https://github.com/apache/airflow/blob/c94f978a66a7cfc31b6d461bbcbfd0f2ddb2962e/airflow/providers/amazon/aws/hooks/emr.py#L43-L46



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] LaPetiteSouris commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
LaPetiteSouris commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r985073631


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,97 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial
+        configuration is used.
+
+        :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration
+            cluster. The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:

Review Comment:
   Neat-picking: Should we have a separate function ?
   
   ```python
   def _validate_params_emr_conn_id(emr_conn_id: str):
   ```
   IMHO it does increase readability by ditching a few level of indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r985072491


##########
tests/providers/amazon/aws/hooks/test_emr.py:
##########
@@ -29,8 +30,8 @@
     mock_emr = None
 
 
-@unittest.skipIf(mock_emr is None, 'moto package not present')

Review Comment:
   Yeah, I try to replace `unittests` by `pytest` in most cases this is not required huge afford.
   However still a lot of `unittests` tests
   
   ```shell
   # Total number of files which use `unittest.TestCase `
   ❯ grep -rl 'unittest.TestCase' ./tests | wc -l
   528
   
   # By tests packages
   ❯ grep -rl 'unittest.TestCase' ./tests | cut -d"/" -f3 | sort | uniq -c | sort -nr
       379 providers
        36 charts
        23 utils
        20 cli
        10 ti_deps
        10 api_connexion
         6 sensors
         6 operators
         6 executors
         6 core
         5 www
         5 always
         4 models
         3 api
         2 task
         2 kubernetes
         1 security
         1 plugins
         1 macros
         1 hooks
         1 dag_processing
   
   # By provider ("apache", "microsoft", "common" has subpackages which is separate provider)
   ❯ grep -rl 'unittest.TestCase' ./tests/providers/ | cut -d"/" -f5 | sort | uniq -c | sort -nr
       113 google
       101 amazon
        32 apache
        26 microsoft
         6 databricks
         4 redis
         4 mysql
         4 alibaba
         3 trino
         3 tableau
         3 qubole
         3 oracle
         3 jenkins
         3 http
         3 docker
         3 atlassian
         3 arangodb
         3 airbyte
         2 yandex
         2 vertica
         2 telegram
         2 sqlite
         2 snowflake
         2 sftp
         2 segment
         2 salesforce
         2 presto
         2 postgres
         2 opsgenie
         2 neo4j
         2 mongo
         2 jdbc
         2 influxdb
         2 imap
         2 grpc
         2 ftp
         2 exasol
         2 discord
         2 dingding
         2 datadog
         2 common
         2 cncf
         2 asana
         1 ssh
         1 singularity
         1 sendgrid
         1 samba
         1 papermill
         1 openfaas
         1 elasticsearch
         1 cloudant
         1 celery
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #26687: Added information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #26687:
URL: https://github.com/apache/airflow/pull/26687#issuecomment-1258631963

   cc: @ferruzzi @o-nikolas @vincbeck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r985138469


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,97 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial
+        configuration is used.
+
+        :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration
+            cluster. The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:

Review Comment:
   Actually for this purpose usually use `get_conn()` however we can not overwrite this method because it uses for obtain AWS credentials (`aws_conn_id`).
   
   We could create this method, but only use in one place. Current implementation not contain any complex logic, so personally I do not see any benefits with this separate method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r982727610


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":

Review Comment:
   I think the code is fine actually, I was just specifically talking about the conn type you're comparing to, the `"emr"`, is there not a Enum, Constant, etc somewhere than can be used instead of this string?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on PR #26687:
URL: https://github.com/apache/airflow/pull/26687#issuecomment-1259750445

   >  Almost all of the changes related to documentation or inform users about something that we do previously silently.
   
   I am personally a big fan of transparency.  :+1: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r982116922


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":

Review Comment:
   **tl;dr**:
   Right now in the PR I suggest that if user not provide any of conn_type, that user mean `emr` however we could do differently if it not set (or empty) than assume that might something went wrong: `if emr_conn.conn_type != "emr":`
   
   **Long explanation**
   First of all why we need use some magic: `conn_type` not a mandatory attribute for class `airflow.models.connection.Connection` and could be an empty string/None.
   
   Actually it not easy to achieve this, for connection in the UI I thought it almost not possible,
   URI base should miss schema something like this `://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@` or for JSON it might be
   ```json
   {
      "login": "AKIAIOSFODNN7EXAMPLE",
      "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
   }
   ```
   
   Connection object do not provided any abilities for actual validate conn_type this. So we could do
   1. `if emr_conn.conn_type and emr_conn.conn_type != "emr":`
   2. `if (emr_conn.conn_type or "emr") != "emr":`
   3. `if emr_conn.conn_type not in ("emr", None):`
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r983904200


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,93 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial
+        configuration is used.
+
+        :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration
+            cluster. The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "using an empty initial configuration. If you want to get rid of this warning "
+                    "message please provide a valid `emr_conn_id` or set it to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":
+                    warnings.warn(
+                        "Amazon Elastic MapReduce Connection expected connection type 'emr', "
+                        f"Connection {self.emr_conn_id!r} has conn_type={emr_conn.conn_type!r}. "
+                        f"This connection might not work correctly.",
+                        UserWarning,
+                        stacklevel=2,
+                    )
+                config = emr_conn.extra_dejson.copy()
         config.update(job_flow_overrides)
 
         response = self.get_conn().run_job_flow(**config)
 
         return response
 
+    def test_connection(self):
+        """
+        Return failed state for test Amazon Elastic MapReduce Connection (untestable)
+
+        We need to overwrite this method because this hook is based on
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
+        otherwise it will try to test connection to AWS STS by using the default boto3 credential strategy.
+        """
+        return False, f"{self.hook_name} Connection cannot be tested."

Review Comment:
   Let me check how it looks in the UI



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r983889877


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,93 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial
+        configuration is used.
+
+        :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration
+            cluster. The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "using an empty initial configuration. If you want to get rid of this warning "
+                    "message please provide a valid `emr_conn_id` or set it to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":
+                    warnings.warn(
+                        "Amazon Elastic MapReduce Connection expected connection type 'emr', "
+                        f"Connection {self.emr_conn_id!r} has conn_type={emr_conn.conn_type!r}. "
+                        f"This connection might not work correctly.",
+                        UserWarning,
+                        stacklevel=2,
+                    )
+                config = emr_conn.extra_dejson.copy()
         config.update(job_flow_overrides)
 
         response = self.get_conn().run_job_flow(**config)
 
         return response
 
+    def test_connection(self):
+        """
+        Return failed state for test Amazon Elastic MapReduce Connection (untestable)
+
+        We need to overwrite this method because this hook is based on
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
+        otherwise it will try to test connection to AWS STS by using the default boto3 credential strategy.
+        """
+        return False, f"{self.hook_name} Connection cannot be tested."

Review Comment:
   What about: `'Amazon Elastic MapReduce' Airflow Connection cannot be tested, by design it stores only key/value pairs and does not make a connection to an external resource'`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r982793678


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,93 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial
+        configuration is used.
+
+        :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration
+            cluster. The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "using an empty initial configuration. If you want to get rid of this warning "
+                    "message please provide a valid `emr_conn_id` or set it to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":
+                    warnings.warn(
+                        "Amazon Elastic MapReduce Connection expected connection type 'emr', "
+                        f"Connection {self.emr_conn_id!r} has conn_type={emr_conn.conn_type!r}. "
+                        f"This connection might not work correctly.",
+                        UserWarning,
+                        stacklevel=2,
+                    )
+                config = emr_conn.extra_dejson.copy()
         config.update(job_flow_overrides)
 
         response = self.get_conn().run_job_flow(**config)
 
         return response
 
+    def test_connection(self):
+        """
+        Return failed state for test Amazon Elastic MapReduce Connection (untestable)
+
+        We need to overwrite this method because this hook is based on
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
+        otherwise it will try to test connection to AWS STS by using the default boto3 credential strategy.
+        """
+        return False, f"{self.hook_name} Connection cannot be tested."

Review Comment:
   This is definitely an outlier here, but IMO it would be nice to include _why_ the connection cannot be tested in the response from the API/UI. It cannot be tested because of a user error? configuration of the environment? something else? I could foresee some users being puzzled without additional context. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r981794380


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",

Review Comment:
   ```suggestion
                       "using an empty initial configuration. If you want to get rid of this warning "
                       "message please provide a valid `emr_conn_id` or set it to None.",
   ```



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -30,8 +31,11 @@
 
 class EmrHook(AwsBaseHook):
     """
-    Interact with AWS EMR. emr_conn_id is only necessary for using the
-    create_job_flow method.
+    Interact with Amazon Elastic MapReduce Service.
+
+    :param emr_conn_id: :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`.
+        This attribute only necessary for using in

Review Comment:
   
   ```suggestion
           This attribute is only necessary when using
   ```



##########
docs/apache-airflow-providers-amazon/connections/emr.rst:
##########
@@ -0,0 +1,43 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection:emr:
+
+Amazon Elastic MapReduce (EMR) Connection
+=========================================
+
+.. note::
+  This connection type only use for store parameters for Start EMR Cluster.

Review Comment:
   ```suggestion
     This connection type is only used to store parameters to Start EMR Cluster (`run_job_flow` boto3 EMR client method).
   ```



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":

Review Comment:
   Is there not a constant we could replace this magic-string with?



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.

Review Comment:
   ```suggestion
           This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration.
           If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial configuration is used.
   
           :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration cluster.
               The resulting configuration will be used in the boto3 emr client run_job_flow method.
   ```



##########
docs/apache-airflow-providers-amazon/connections/emr.rst:
##########
@@ -0,0 +1,43 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection:emr:
+
+Amazon Elastic MapReduce (EMR) Connection
+=========================================
+
+.. note::
+  This connection type only use for store parameters for Start EMR Cluster.
+
+  This connection not intend to store any credentials for ``boto3`` client, if you try to pass any
+  parameters not listed in `RunJobFlow API <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+  you will get an error like this.
+
+  .. code-block:: text
+
+      Parameter validation failed: Unknown parameter in input: "region_name", must be one of:
+
+  For Authenticating to AWS please use :ref:`Amazon Web Services Connection <howto/connection:aws>`.
+
+Configuring the Connection
+--------------------------
+
+Extra (optional)
+    Specify the parameters (as json dictionary) that can be used as initial configuration
+    in :meth:`airflow.providers.amazon.aws.hooks.emr.EmrHook.create_job_flow` for propagate to
+    `RunJobFlow API <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_.

Review Comment:
   ```suggestion
       Specify the parameters (as a `json` dictionary) that can be used as an initial configuration
       in :meth:`airflow.providers.amazon.aws.hooks.emr.EmrHook.create_job_flow` to propagate to
       `RunJobFlow API <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_.
   ```



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -332,10 +332,12 @@ class EmrCreateJobFlowOperator(BaseOperator):
         running Airflow in a distributed manner and aws_conn_id is None or
         empty, then default boto3 configuration would be used (and must be
         maintained on each worker node)
-    :param emr_conn_id: emr connection to use for run_job_flow request body.
-        This will be overridden by the job_flow_overrides param
+    :param emr_conn_id: :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`.
+        Use for receive initial Amazon EMR cluster configuration:
+        ``boto3.client('emr').run_job_flow`` request body.
+        If this is None or empty or connection not exists then empty initial configuration is used.

Review Comment:
   ```suggestion
           Use to receive an initial Amazon EMR cluster configuration:
           ``boto3.client('emr').run_job_flow`` request body.
           If this is None or empty or the connection does not exist, then an empty initial configuration is used.
   ```



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":
+                    warnings.warn(
+                        "Amazon Elastic MapReduce Connection expected connection type 'emr', "
+                        f"Connection {self.emr_conn_id!r} has conn_type={emr_conn.conn_type!r}. "
+                        f"This connection might not work correctly.",
+                        UserWarning,
+                        stacklevel=2,
+                    )
+                config = emr_conn.extra_dejson.copy()
         config.update(job_flow_overrides)
 
         response = self.get_conn().run_job_flow(**config)
 
         return response
 
+    def test_connection(self):
+        """
+        Return failed state for test Amazon Elastic MapReduce Connection (untestable)
+
+        We need to overwrite this method because this hook based on
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
+        otherwise it will try to test connection to AWS STS by use default boto3 credential strategy.

Review Comment:
   ```suggestion
           We need to overwrite this method because this hook is based on
           :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
           otherwise it will try to test connection to AWS STS by using the default boto3 credential strategy.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r982116922


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":

Review Comment:
   **tl;dr**:
   Right now in the PR I suggest that if user not provide any of conn_type, that user mean `emr` however we could do differently if it not set (or empty) than assume that might something went wrong: `if emr_conn.conn_type != "emr":`
   
   **Long explanation**
   First of all why we need use some magic: `conn_type` not a mandatory attribute for class `airflow.models.connection.Connection` and could be an empty string/None.
   
   Actually it not easy to achieve this, for connection in the UI I thought it almost not possible,
   URI base should miss schema something like this `://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@` or for JSON it might be
   ```json
   {
      "login": "AKIAIOSFODNN7EXAMPLE",
      "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
   }
   ```
   
   Connection object do not provided any abilities for actual validate conn_type. So we should do something like
   1. `if emr_conn.conn_type and emr_conn.conn_type != "emr":`
   2. `if (emr_conn.conn_type or "emr") != "emr":`
   3. `if emr_conn.conn_type not in ("emr", None):`
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org