You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/27 23:25:09 UTC

[GitHub] [airflow] o-nikolas commented on a diff in pull request #26687: Add information about Amazon Elastic MapReduce Connection

o-nikolas commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r981794380


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",

Review Comment:
   ```suggestion
                       "using an empty initial configuration. If you want to get rid of this warning "
                       "message please provide a valid `emr_conn_id` or set it to None.",
   ```



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -30,8 +31,11 @@
 
 class EmrHook(AwsBaseHook):
     """
-    Interact with AWS EMR. emr_conn_id is only necessary for using the
-    create_job_flow method.
+    Interact with Amazon Elastic MapReduce Service.
+
+    :param emr_conn_id: :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`.
+        This attribute only necessary for using in

Review Comment:
   
   ```suggestion
           This attribute is only necessary when using
   ```



##########
docs/apache-airflow-providers-amazon/connections/emr.rst:
##########
@@ -0,0 +1,43 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection:emr:
+
+Amazon Elastic MapReduce (EMR) Connection
+=========================================
+
+.. note::
+  This connection type only use for store parameters for Start EMR Cluster.

Review Comment:
   ```suggestion
     This connection type is only used to store parameters to Start EMR Cluster (`run_job_flow` boto3 EMR client method).
   ```



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":

Review Comment:
   Is there not a constant we could replace this magic-string with?



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.

Review Comment:
   ```suggestion
           This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration.
           If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial configuration is used.
   
           :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration cluster.
               The resulting configuration will be used in the boto3 emr client run_job_flow method.
   ```



##########
docs/apache-airflow-providers-amazon/connections/emr.rst:
##########
@@ -0,0 +1,43 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection:emr:
+
+Amazon Elastic MapReduce (EMR) Connection
+=========================================
+
+.. note::
+  This connection type only use for store parameters for Start EMR Cluster.
+
+  This connection not intend to store any credentials for ``boto3`` client, if you try to pass any
+  parameters not listed in `RunJobFlow API <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+  you will get an error like this.
+
+  .. code-block:: text
+
+      Parameter validation failed: Unknown parameter in input: "region_name", must be one of:
+
+  For Authenticating to AWS please use :ref:`Amazon Web Services Connection <howto/connection:aws>`.
+
+Configuring the Connection
+--------------------------
+
+Extra (optional)
+    Specify the parameters (as json dictionary) that can be used as initial configuration
+    in :meth:`airflow.providers.amazon.aws.hooks.emr.EmrHook.create_job_flow` for propagate to
+    `RunJobFlow API <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_.

Review Comment:
   ```suggestion
       Specify the parameters (as a `json` dictionary) that can be used as an initial configuration
       in :meth:`airflow.providers.amazon.aws.hooks.emr.EmrHook.create_job_flow` to propagate to
       `RunJobFlow API <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_.
   ```



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -332,10 +332,12 @@ class EmrCreateJobFlowOperator(BaseOperator):
         running Airflow in a distributed manner and aws_conn_id is None or
         empty, then default boto3 configuration would be used (and must be
         maintained on each worker node)
-    :param emr_conn_id: emr connection to use for run_job_flow request body.
-        This will be overridden by the job_flow_overrides param
+    :param emr_conn_id: :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`.
+        Use for receive initial Amazon EMR cluster configuration:
+        ``boto3.client('emr').run_job_flow`` request body.
+        If this is None or empty or connection not exists then empty initial configuration is used.

Review Comment:
   ```suggestion
           Use to receive an initial Amazon EMR cluster configuration:
           ``boto3.client('emr').run_job_flow`` request body.
           If this is None or empty or the connection does not exist, then an empty initial configuration is used.
   ```



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or connection not exists than empty initial configuration is used.
+
+        :param job_flow_overrides: Uses for overwrite parameters in initial Amazon EMR configuration cluster.
+            The resulting configuration will be used in the boto3 emr client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method <https://boto3.amazonaws.com/v1/documentation/\
+               api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID {self.emr_conn_id!r}, "
+                    "use empty initial configuration. If you want to get rid of this warning "
+                    "message please set `emr_conn_id` to None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":
+                    warnings.warn(
+                        "Amazon Elastic MapReduce Connection expected connection type 'emr', "
+                        f"Connection {self.emr_conn_id!r} has conn_type={emr_conn.conn_type!r}. "
+                        f"This connection might not work correctly.",
+                        UserWarning,
+                        stacklevel=2,
+                    )
+                config = emr_conn.extra_dejson.copy()
         config.update(job_flow_overrides)
 
         response = self.get_conn().run_job_flow(**config)
 
         return response
 
+    def test_connection(self):
+        """
+        Return failed state for test Amazon Elastic MapReduce Connection (untestable)
+
+        We need to overwrite this method because this hook based on
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
+        otherwise it will try to test connection to AWS STS by use default boto3 credential strategy.

Review Comment:
   ```suggestion
           We need to overwrite this method because this hook is based on
           :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
           otherwise it will try to test connection to AWS STS by using the default boto3 credential strategy.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org