You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/01 22:47:56 UTC

[GitHub] [airflow] dacort opened a new pull request #16766: Add an Amazon EMR on EKS provider package

dacort opened a new pull request #16766:
URL: https://github.com/apache/airflow/pull/16766


   Adds a new provider package in `amazon/aws`.
   
   EMR on EKS is a new deployment model for EMR that allows you to run Spark jobs on EKS.
   
   This package adds a new operator, sensor, and hook for running and monitoring the jobs as well as docs and an example DAG.
   
   _Please note that I'm opening this PR to get some feedback on the style of the plugin_
   
   I've tested a separate version of this on Airflow 2.0.2, but learning how to do this in my development environment using breeze.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-875107879


   All checks passing - will do a squash after I address any review comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-882821163


   Can you please rebase to latest main ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r668278696



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)
+        while True:
+            query_state = self.check_query_status(job_id)
+            if query_state is None:
+                self.log.info(f"Try {try_number}: Invalid query state. Retrying again")
+            elif query_state in self.INTERMEDIATE_STATES:
+                self.log.info(f"Try {try_number}: Query is still in an intermediate state - {query_state}")
+            else:
+                self.log.info(f"Try {try_number}: Query execution completed. Final state is {query_state}")

Review comment:
       Ah ok, cool - makes perfect sense thanks for the explanation. I'll update my logging statements accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667309898



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')

Review comment:
       This is not standard behavior for AWS Base Hook, so you need to write a guide on how to configure this connection. Similarly, we have [a GCP connection](http://airflow.apache.org/docs/apache-airflow-providers-google/stable/connections/gcp.html) and [a GCP SSH connection](http://airflow.apache.org/docs/apache-airflow-providers-google/stable/connections/gcp_ssh.html).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r666421192



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,
+        max_tries: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.name = name
+        self.virtual_cluster_id = virtual_cluster_id
+        self.execution_role_arn = execution_role_arn
+        self.release_label = release_label
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides or {}
+        self.aws_conn_id = aws_conn_id
+        self.client_request_token = client_request_token or str(uuid4())
+        self.sleep_time = sleep_time
+        self.max_tries = max_tries
+        self.job_id = None
+
+    @cached_property
+    def hook(self) -> EMRContainerHook:
+        """Create and return an EMRContainerHook."""
+        return EMRContainerHook(
+            self.aws_conn_id,
+            virtual_cluster_id=self.virtual_cluster_id,
+        )
+
+    def execute(self, context: dict) -> Optional[str]:
+        """Run job on EMR Containers"""
+        self.job_id = self.hook.submit_job(
+            self.name,
+            self.execution_role_arn,
+            self.release_label,
+            self.job_driver,
+            self.configuration_overrides,
+            self.client_request_token,
+        )
+        query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.sleep_time)
+
+        if query_status in EMRContainerHook.FAILURE_STATES:
+            # self.hook.get_state_change_reason(self.query_execution_id)
+            error_message = "BEEP BOOP"

Review comment:
       Updated to retrieve failure reason from the job description. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r758372331



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)

Review comment:
       Opened followup task https://github.com/apache/airflow/issues/19877




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r668082360



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')

Review comment:
       OK, will do - wondering if I should remove if it's not standard. I saw it used in another connector and thought it could be useful to be able to configure the virual_cluster_id and role_arn via a connection.

##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)

Review comment:
       After thinking about this a little bit more, I think my concern was more about the logic here solely relying on the `INTERMEDIATE_STATES`. 
   
   What that means is if the API ever changes (not likely), the logic here could break. I think the only change I would make here would be a more explicit check if the `query_state` is _actually_ in a completed state...but that's the current logic anyway because there's either `None` state, `INTERMEDIATE` state, or `COMPLETED` state.

##########
File path: airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag for an Amazon EMR on EKS Spark job.
+"""
+import os
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.emr_containers import EMRContainerOperator
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_emr_eks_env_variables]
+VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "test-cluster")
+JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::012345678912:role/emr_eks_default_role")
+# [END howto_operator_emr_eks_env_variables]
+
+
+# [START howto_operator_emr_eks_config]
+JOB_DRIVER_ARG = {
+    "sparkSubmitJobDriver": {
+        "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
+        "sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1",  # noqa: E501
+    }
+}
+
+CONFIGURATION_OVERRIDES_ARG = {
+    "monitoringConfiguration": {
+        "cloudWatchMonitoringConfiguration": {
+            "logGroupName": "/aws/emr-eks-spark",
+            "logStreamNamePrefix": "airflow",
+        }
+    }
+}
+# [END howto_operator_emr_eks_config]
+
+with DAG(
+    dag_id='emr_eks_pi_job',
+    dagrun_timeout=timedelta(hours=2),
+    start_date=days_ago(1),
+    schedule_interval="@once",
+    tags=["emr_containers", "example"],
+) as dag:
+
+    # An example of how to get the cluster id and arn from an Airflow connection
+    # c = BaseHook.get_connection("emr_eks")

Review comment:
       Updated example to use jinja macro.
   
   ```python
       # An example of how to get the cluster id and arn from an Airflow connection
       # VIRTUAL_CLUSTER_ID = '{{ conn.emr_eks.extra_dejson["virtual_cluster_id"] }}'
       # JOB_ROLE_ARN = '{{ conn.emr_eks.extra_dejson["job_role_arn"] }}'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-906927131


   @mik-laj It doesn't look like the build failures are related to the PR? The errors are in the `helm-charts` package, which I haven't touched. Maybe I should do another rebase?
   
   ![image](https://user-images.githubusercontent.com/1512/131074452-94b43c9a-b961-4cce-87c2-c47a35aa2f37.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-907474332


   @mik-laj Looks like they succeeded after a rebase!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667418894



##########
File path: airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag for an Amazon EMR on EKS Spark job.
+"""
+import os
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.emr_containers import EMRContainerOperator
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_emr_eks_env_variables]
+VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "test-cluster")
+JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::012345678912:role/emr_eks_default_role")
+# [END howto_operator_emr_eks_env_variables]
+
+
+# [START howto_operator_emr_eks_config]
+JOB_DRIVER_ARG = {
+    "sparkSubmitJobDriver": {
+        "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
+        "sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1",  # noqa: E501
+    }
+}
+
+CONFIGURATION_OVERRIDES_ARG = {
+    "monitoringConfiguration": {
+        "cloudWatchMonitoringConfiguration": {
+            "logGroupName": "/aws/emr-eks-spark",
+            "logStreamNamePrefix": "airflow",
+        }
+    }
+}
+# [END howto_operator_emr_eks_config]
+
+with DAG(
+    dag_id='emr_eks_pi_job',
+    dagrun_timeout=timedelta(hours=2),
+    start_date=days_ago(1),
+    schedule_interval="@once",
+    tags=["emr_containers", "example"],
+) as dag:
+
+    # An example of how to get the cluster id and arn from an Airflow connection
+    # c = BaseHook.get_connection("emr_eks")

Review comment:
       Ah interesting, wasn't aware of that - I'll look more into jinja macros. 
   
   My goal was to show how to load the cluster id and role arn from a connection as opposed to hard-coding it in the DAG.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-878647076


   "Build PROD images 3.6" step was failing, due to what I think were some recent changes in the `main` branch. Rebased off main and will see if that fixes it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-878645934


   @mik-laj OK, I think I've addressed all your feedback. Let me know if there's anything else needed!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r665557984



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,189 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.get_conn().start_job_run(**params)

Review comment:
       It should - I'll go ahead and move to `self.conn`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r665519796



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,189 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.get_conn().start_job_run(**params)

Review comment:
       `self.get_conn()` is being deprecated.  Will this work with the preferred `self.conn`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-907518209


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r665617689



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,
+        max_tries: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.name = name
+        self.virtual_cluster_id = virtual_cluster_id
+        self.execution_role_arn = execution_role_arn
+        self.release_label = release_label
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides or {}
+        self.aws_conn_id = aws_conn_id
+        self.client_request_token = client_request_token or str(uuid4())
+        self.sleep_time = sleep_time
+        self.max_tries = max_tries
+        self.job_id = None
+
+    @cached_property
+    def hook(self) -> EMRContainerHook:
+        """Create and return an EMRContainerHook."""
+        return EMRContainerHook(
+            self.aws_conn_id,
+            virtual_cluster_id=self.virtual_cluster_id,
+        )
+
+    def execute(self, context: dict) -> Optional[str]:
+        """Run job on EMR Containers"""
+        self.job_id = self.hook.submit_job(
+            self.name,
+            self.execution_role_arn,
+            self.release_label,
+            self.job_driver,
+            self.configuration_overrides,
+            self.client_request_token,
+        )
+        query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.sleep_time)
+
+        if query_status in EMRContainerHook.FAILURE_STATES:
+            # self.hook.get_state_change_reason(self.query_execution_id)
+            error_message = "BEEP BOOP"
+            raise AirflowException(
+                "EMR Containers job failed. Final state is {}, query_execution_id is {}. Error: {}".format(
+                    query_status, self.job_id, error_message
+                )
+            )
+        elif not query_status or query_status in EMRContainerHook.INTERMEDIATE_STATES:
+            raise AirflowException(
+                "Final state of EMR Containers job is {}. "
+                "Max tries of poll status exceeded, query_execution_id is {}.".format(
+                    query_status, self.job_id
+                )
+            )
+
+        return self.job_id
+
+    def on_kill(self) -> None:
+        """Cancel the submitted job run"""
+        if self.job_id:
+            self.log.info("Stopping job run with jobId - %s", self.job_id)
+            response = self.hook.stop_query(self.job_id)
+            http_status_code = None
+            try:
+                http_status_code = response["ResponseMetadata"]["HTTPStatusCode"]
+            except Exception as ex:  # pylint: disable=broad-except

Review comment:
       Good point...my initial thought was that we don't care if there's an exception while killing the job.
   
   What's the "typical" approach if an operator encounters an exception in `on_kill`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-902844207


   @ashb Thanks for pinging folks - I just pushed one more update with a little documentation cleanup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wanderijames commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
wanderijames commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r684601381



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,205 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: str = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = virtual_cluster_id
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                "Start Job Run success - Job Id %s and virtual cluster id %s",
+                response['id'],
+                response['virtualClusterId'],
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)
+        while True:
+            query_state = self.check_query_status(job_id)
+            if query_state is None:
+                self.log.info("Try %s: Invalid query state. Retrying again", try_number)
+            elif query_state in self.INTERMEDIATE_STATES:
+                self.log.info("Try %s: Query is still in an intermediate state - %s", try_number, query_state)
+            else:
+                self.log.info("Try %s: Query execution completed. Final state is %s", try_number, query_state)
+                final_query_state = query_state
+                break
+            if max_tries and try_number >= max_tries:  # Break loop if max_tries reached
+                final_query_state = query_state
+                break
+            try_number += 1
+            sleep(poll_interval)
+        return final_query_state

Review comment:
       Have you thought about jobs that are not expected to terminate in a relatively short time? If I submit a streaming job for spark using this operator, then my job needs to be running for a longer time. Do you need to implement some kind of backoff-algorithm based checks on this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r666444103



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,
+        max_tries: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.name = name
+        self.virtual_cluster_id = virtual_cluster_id
+        self.execution_role_arn = execution_role_arn
+        self.release_label = release_label
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides or {}
+        self.aws_conn_id = aws_conn_id
+        self.client_request_token = client_request_token or str(uuid4())
+        self.sleep_time = sleep_time
+        self.max_tries = max_tries
+        self.job_id = None
+
+    @cached_property
+    def hook(self) -> EMRContainerHook:
+        """Create and return an EMRContainerHook."""
+        return EMRContainerHook(
+            self.aws_conn_id,
+            virtual_cluster_id=self.virtual_cluster_id,
+        )
+
+    def execute(self, context: dict) -> Optional[str]:
+        """Run job on EMR Containers"""
+        self.job_id = self.hook.submit_job(
+            self.name,
+            self.execution_role_arn,
+            self.release_label,
+            self.job_driver,
+            self.configuration_overrides,
+            self.client_request_token,
+        )
+        query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.sleep_time)
+
+        if query_status in EMRContainerHook.FAILURE_STATES:
+            # self.hook.get_state_change_reason(self.query_execution_id)
+            error_message = "BEEP BOOP"
+            raise AirflowException(
+                "EMR Containers job failed. Final state is {}, query_execution_id is {}. Error: {}".format(
+                    query_status, self.job_id, error_message
+                )
+            )
+        elif not query_status or query_status in EMRContainerHook.INTERMEDIATE_STATES:
+            raise AirflowException(
+                "Final state of EMR Containers job is {}. "
+                "Max tries of poll status exceeded, query_execution_id is {}.".format(
+                    query_status, self.job_id

Review comment:
       It should, yes. If `None` is returned, it means we exceeded the number of max tries and the final state will be printed as "None" in the error message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-882837737


   @potiuk Yep, rebased. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-878805530


   It's an undesired side effect of #16950 . Working on a fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-902222417


   @ferruzzi @o-nikolas Any opinions on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r692455216



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,205 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: str = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = virtual_cluster_id
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                "Start Job Run success - Job Id %s and virtual cluster id %s",
+                response['id'],
+                response['virtualClusterId'],
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)
+        while True:
+            query_state = self.check_query_status(job_id)
+            if query_state is None:
+                self.log.info("Try %s: Invalid query state. Retrying again", try_number)
+            elif query_state in self.INTERMEDIATE_STATES:
+                self.log.info("Try %s: Query is still in an intermediate state - %s", try_number, query_state)
+            else:
+                self.log.info("Try %s: Query execution completed. Final state is %s", try_number, query_state)
+                final_query_state = query_state
+                break
+            if max_tries and try_number >= max_tries:  # Break loop if max_tries reached
+                final_query_state = query_state
+                break
+            try_number += 1
+            sleep(poll_interval)
+        return final_query_state

Review comment:
       @wanderijames That's a good point. I don't know if we need to implement a backoff, we do already have the option to change the poll interval. But that's also where I think your PR is nice in that it has the operator to just start the job.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r665566500



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,
+        max_tries: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.name = name
+        self.virtual_cluster_id = virtual_cluster_id
+        self.execution_role_arn = execution_role_arn
+        self.release_label = release_label
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides or {}
+        self.aws_conn_id = aws_conn_id
+        self.client_request_token = client_request_token or str(uuid4())
+        self.sleep_time = sleep_time
+        self.max_tries = max_tries
+        self.job_id = None
+
+    @cached_property
+    def hook(self) -> EMRContainerHook:
+        """Create and return an EMRContainerHook."""
+        return EMRContainerHook(
+            self.aws_conn_id,
+            virtual_cluster_id=self.virtual_cluster_id,
+        )
+
+    def execute(self, context: dict) -> Optional[str]:
+        """Run job on EMR Containers"""
+        self.job_id = self.hook.submit_job(
+            self.name,
+            self.execution_role_arn,
+            self.release_label,
+            self.job_driver,
+            self.configuration_overrides,
+            self.client_request_token,
+        )
+        query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.sleep_time)
+
+        if query_status in EMRContainerHook.FAILURE_STATES:
+            # self.hook.get_state_change_reason(self.query_execution_id)
+            error_message = "BEEP BOOP"

Review comment:
       More useful error message, perhaps? :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r665614868



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,

Review comment:
       Sounds good - I think I'll also rename `sleep_time` to `poll_interval` as it's slightly more accurate/descriptive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667308718



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+        :param job_id: Id of submitted job run

Review comment:
       ```suggestion
   
           :param job_id: Id of submitted job run
   ```
   To render docs correcttly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-906815913


   @dacort  Docs have some issues. Can you look at it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-885290789


   This one is specifically for the Amazon EMR runtime. AWS customers today run EMR on EC2, but EMR on EKS was introduced last year as a way to run EMR jobs on EKS. In addition to simply running Spark jobs, it also provides the EMR Spark runtime, hosted Spark UI, integration with EMR Studio, and additional functionality like S3/CloudWatch logging, pod templates, and custom containers based on the EMR runtime. 
   
   You can see more about what EMR on EKS provides here: https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/emr-eks.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r665561297



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,

Review comment:
       Should these be type hinted to Optionals?  I'd also like to see the default values moved to constants, but that's maybe persona preference.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667422140



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type poll_interval: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        poll_interval: int = 30,
+        max_tries: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.name = name
+        self.virtual_cluster_id = virtual_cluster_id
+        self.execution_role_arn = execution_role_arn
+        self.release_label = release_label
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides or {}
+        self.aws_conn_id = aws_conn_id
+        self.client_request_token = client_request_token or str(uuid4())
+        self.poll_interval = poll_interval
+        self.max_tries = max_tries
+        self.job_id = None
+
+    @cached_property
+    def hook(self) -> EMRContainerHook:
+        """Create and return an EMRContainerHook."""
+        return EMRContainerHook(
+            self.aws_conn_id,
+            virtual_cluster_id=self.virtual_cluster_id,
+        )
+
+    def execute(self, context: dict) -> Optional[str]:
+        """Run job on EMR Containers"""
+        self.job_id = self.hook.submit_job(
+            self.name,
+            self.execution_role_arn,
+            self.release_label,
+            self.job_driver,
+            self.configuration_overrides,
+            self.client_request_token,
+        )
+        query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.poll_interval)
+
+        if query_status in EMRContainerHook.FAILURE_STATES:
+            error_message = self.hook.get_job_failure_reason(self.job_id)
+            raise AirflowException(
+                "EMR Containers job failed. Final state is {}, query_execution_id is {}. Error: {}".format(

Review comment:
       Yep, will do!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r666054245



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,

Review comment:
       defaults inline is fine, and is the more moral style we use, espeically as the constant would only be used in this one place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667422122



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)
+        while True:
+            query_state = self.check_query_status(job_id)
+            if query_state is None:
+                self.log.info(f"Try {try_number}: Invalid query state. Retrying again")
+            elif query_state in self.INTERMEDIATE_STATES:
+                self.log.info(f"Try {try_number}: Query is still in an intermediate state - {query_state}")
+            else:
+                self.log.info(f"Try {try_number}: Query execution completed. Final state is {query_state}")

Review comment:
       I'm a little confused - was this comment intended for when I use `.format()` in some of the other code?
   
   I dug into this a little bit and [this github comment](https://github.com/PyCQA/pylint/issues/2395#issue-348786471) linked from the stackoverflow makes it seem like f-strings and %-operator are about the same performance-wise?
   
   Happy to change it, just wanted to make sure I'm doing the right thing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-882821163


   Can you please rebase to latest main ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-876925890


   Trying to get all the tests to green - some of the (unrelated) integration tests are timing out for some reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667310762



##########
File path: docs/apache-airflow-providers-amazon/operators/emr_eks.rst
##########
@@ -0,0 +1,87 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+.. _howto/operator:EMRContainersOperators:
+
+Amazon EMR on EKS Operators
+===========================
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Overview
+--------
+
+Airflow to Amazon EMR on Amazon EKS integration provides a way to run Apache Spark jobs on Kubernetes.
+
+- :class:`~airflow.providers.amazon.aws.operators.emr_containers.EMRContainerOperator`
+
+There is an example dag that shows how to run a job with this operator.
+
+- example_emr_eks_job.py
+
+Create EMR on EKS job with sample script
+----------------------------------------
+
+Purpose
+"""""""
+
+This example dag ``example_emr_eks_job.py`` uses ``EMRContainerOperator`` to create a new EMR on EKS job calculating the mathematical constant ``Pi``, and monitors the progress
+with ``EMRContainerSensor``.
+
+This example assumes that you already have an EMR on EKS virtual cluster configured. See the `EMR on EKS Getting Started guide <https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/getting-started.html>`__ for more information.
+
+Environment variables
+"""""""""""""""""""""
+
+This example relies on the following variables, which can be passed via OS environment variables.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
+    :language: python
+    :start-after: [START howto_operator_emr_eks_env_variables]
+    :end-before: [END howto_operator_emr_eks_env_variables]
+
+Job configuration
+"""""""""""""""""
+
+To create a job for EMR on EKS, you need to specify your job configuration, any monitoring configuration, and a few other details for the job.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
+    :language: python
+    :start-after: [START howto_operator_emr_eks_config]
+    :end-before: [END howto_operator_emr_eks_config]
+
+With EMR on EKS, you specify your Spark configuration, the EMR release you want to use, the IAM role to use for the job, and the EMR virtual cluster ID.
+
+In the example, we retrieve the ``virtual_cluster_id`` and ``execution_role_arn`` values from environment variables, but you can store them in a Connection or provide them in the DAG.

Review comment:
       ```suggestion
   We pass the ``virtual_cluster_id`` and ``execution_role_arn`` values as operator parameters, but you can store them in a Connection or provide them in the DAG.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r666433426



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,189 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.get_conn().start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.get_conn().describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.get_conn().exceptions.ResourceNotFoundException:

Review comment:
       It does, yes, but it's probably better to add a `ClientError` handler rather than just a generic `Exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r668298827



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')

Review comment:
       OK, going to remove it since it's non-standard.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-882837737


   @potiuk Yep, rebased. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wanderijames commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
wanderijames commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r693494937



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,205 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: str = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = virtual_cluster_id
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                "Start Job Run success - Job Id %s and virtual cluster id %s",
+                response['id'],
+                response['virtualClusterId'],
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)
+        while True:
+            query_state = self.check_query_status(job_id)
+            if query_state is None:
+                self.log.info("Try %s: Invalid query state. Retrying again", try_number)
+            elif query_state in self.INTERMEDIATE_STATES:
+                self.log.info("Try %s: Query is still in an intermediate state - %s", try_number, query_state)
+            else:
+                self.log.info("Try %s: Query execution completed. Final state is %s", try_number, query_state)
+                final_query_state = query_state
+                break
+            if max_tries and try_number >= max_tries:  # Break loop if max_tries reached
+                final_query_state = query_state
+                break
+            try_number += 1
+            sleep(poll_interval)
+        return final_query_state

Review comment:
       Cool. Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-882821163


   Can you please rebase to latest main ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-882837737


   @potiuk Yep, rebased. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #16766:
URL: https://github.com/apache/airflow/pull/16766


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667310435



##########
File path: docs/apache-airflow-providers-amazon/operators/emr_eks.rst
##########
@@ -0,0 +1,87 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+.. _howto/operator:EMRContainersOperators:
+
+Amazon EMR on EKS Operators
+===========================
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Overview
+--------
+
+Airflow to Amazon EMR on Amazon EKS integration provides a way to run Apache Spark jobs on Kubernetes.
+
+- :class:`~airflow.providers.amazon.aws.operators.emr_containers.EMRContainerOperator`
+
+There is an example dag that shows how to run a job with this operator.
+
+- example_emr_eks_job.py

Review comment:
       ```suggestion
   ```
   I know that some AWS guides contain references to DAG example, but it is not necessary. This filename itself has no value. We at least need a link to the file to be able to open this file.
   
   On the other hand, each example uses the exampleinclude directive, which means we have a link to the source code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667309318



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type poll_interval: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        poll_interval: int = 30,
+        max_tries: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.name = name
+        self.virtual_cluster_id = virtual_cluster_id
+        self.execution_role_arn = execution_role_arn
+        self.release_label = release_label
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides or {}
+        self.aws_conn_id = aws_conn_id
+        self.client_request_token = client_request_token or str(uuid4())
+        self.poll_interval = poll_interval
+        self.max_tries = max_tries
+        self.job_id = None
+
+    @cached_property
+    def hook(self) -> EMRContainerHook:
+        """Create and return an EMRContainerHook."""
+        return EMRContainerHook(
+            self.aws_conn_id,
+            virtual_cluster_id=self.virtual_cluster_id,
+        )
+
+    def execute(self, context: dict) -> Optional[str]:
+        """Run job on EMR Containers"""
+        self.job_id = self.hook.submit_job(
+            self.name,
+            self.execution_role_arn,
+            self.release_label,
+            self.job_driver,
+            self.configuration_overrides,
+            self.client_request_token,
+        )
+        query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.poll_interval)
+
+        if query_status in EMRContainerHook.FAILURE_STATES:
+            error_message = self.hook.get_job_failure_reason(self.job_id)
+            raise AirflowException(
+                "EMR Containers job failed. Final state is {}, query_execution_id is {}. Error: {}".format(

Review comment:
       Can you use fstring formatting here?  This is the preferred text formatting style for this project.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r665549987



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,189 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.get_conn().start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.get_conn().describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.get_conn().exceptions.ResourceNotFoundException:

Review comment:
       Does this work?  I would expect this to have been wrapped in a `ClientError`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667309111



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)
+        while True:
+            query_state = self.check_query_status(job_id)
+            if query_state is None:
+                self.log.info(f"Try {try_number}: Invalid query state. Retrying again")
+            elif query_state in self.INTERMEDIATE_STATES:
+                self.log.info(f"Try {try_number}: Query is still in an intermediate state - {query_state}")
+            else:
+                self.log.info(f"Try {try_number}: Query execution completed. Final state is {query_state}")

Review comment:
       ```suggestion
                   self.log.info(f"Try %s: Query execution completed. Final state is %s", try_number, query_state)
   ```
   Please avoid formatting the string before passing it to the logger. See: https://stackoverflow.com/questions/34619790/pylint-message-logging-format-interpolation

##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)
+        while True:
+            query_state = self.check_query_status(job_id)
+            if query_state is None:
+                self.log.info(f"Try {try_number}: Invalid query state. Retrying again")
+            elif query_state in self.INTERMEDIATE_STATES:
+                self.log.info(f"Try {try_number}: Query is still in an intermediate state - {query_state}")
+            else:
+                self.log.info(f"Try {try_number}: Query execution completed. Final state is {query_state}")
+                final_query_state = query_state
+                break
+            if max_tries and try_number >= max_tries:  # Break loop if max_tries reached
+                final_query_state = query_state
+                break
+            try_number += 1
+            sleep(poll_interval)
+        return final_query_state
+
+    def stop_query(self, job_id: str) -> Dict:
+        """
+        Cancel the submitted job_run
+        :param job_id: Id of submitted job_run

Review comment:
       ```suggestion
   
           :param job_id: Id of submitted job_run
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r665567789



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,
+        max_tries: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.name = name
+        self.virtual_cluster_id = virtual_cluster_id
+        self.execution_role_arn = execution_role_arn
+        self.release_label = release_label
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides or {}
+        self.aws_conn_id = aws_conn_id
+        self.client_request_token = client_request_token or str(uuid4())
+        self.sleep_time = sleep_time
+        self.max_tries = max_tries
+        self.job_id = None
+
+    @cached_property
+    def hook(self) -> EMRContainerHook:
+        """Create and return an EMRContainerHook."""
+        return EMRContainerHook(
+            self.aws_conn_id,
+            virtual_cluster_id=self.virtual_cluster_id,
+        )
+
+    def execute(self, context: dict) -> Optional[str]:
+        """Run job on EMR Containers"""
+        self.job_id = self.hook.submit_job(
+            self.name,
+            self.execution_role_arn,
+            self.release_label,
+            self.job_driver,
+            self.configuration_overrides,
+            self.client_request_token,
+        )
+        query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.sleep_time)
+
+        if query_status in EMRContainerHook.FAILURE_STATES:
+            # self.hook.get_state_change_reason(self.query_execution_id)
+            error_message = "BEEP BOOP"
+            raise AirflowException(
+                "EMR Containers job failed. Final state is {}, query_execution_id is {}. Error: {}".format(
+                    query_status, self.job_id, error_message
+                )
+            )
+        elif not query_status or query_status in EMRContainerHook.INTERMEDIATE_STATES:
+            raise AirflowException(
+                "Final state of EMR Containers job is {}. "
+                "Max tries of poll status exceeded, query_execution_id is {}.".format(
+                    query_status, self.job_id

Review comment:
       You have this catching `if not query_status`, then using query_status in the error message.  Is that going to work as expected?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-885299698


   @mik-laj I updated the documentation to help folks understand what EMR on EKS is and how to use the operator.
   
   Let me know what you think!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r665571458



##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,
+        max_tries: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.name = name
+        self.virtual_cluster_id = virtual_cluster_id
+        self.execution_role_arn = execution_role_arn
+        self.release_label = release_label
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides or {}
+        self.aws_conn_id = aws_conn_id
+        self.client_request_token = client_request_token or str(uuid4())
+        self.sleep_time = sleep_time
+        self.max_tries = max_tries
+        self.job_id = None
+
+    @cached_property
+    def hook(self) -> EMRContainerHook:
+        """Create and return an EMRContainerHook."""
+        return EMRContainerHook(
+            self.aws_conn_id,
+            virtual_cluster_id=self.virtual_cluster_id,
+        )
+
+    def execute(self, context: dict) -> Optional[str]:
+        """Run job on EMR Containers"""
+        self.job_id = self.hook.submit_job(
+            self.name,
+            self.execution_role_arn,
+            self.release_label,
+            self.job_driver,
+            self.configuration_overrides,
+            self.client_request_token,
+        )
+        query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.sleep_time)
+
+        if query_status in EMRContainerHook.FAILURE_STATES:
+            # self.hook.get_state_change_reason(self.query_execution_id)
+            error_message = "BEEP BOOP"
+            raise AirflowException(
+                "EMR Containers job failed. Final state is {}, query_execution_id is {}. Error: {}".format(
+                    query_status, self.job_id, error_message
+                )
+            )
+        elif not query_status or query_status in EMRContainerHook.INTERMEDIATE_STATES:
+            raise AirflowException(
+                "Final state of EMR Containers job is {}. "
+                "Max tries of poll status exceeded, query_execution_id is {}.".format(
+                    query_status, self.job_id
+                )
+            )
+
+        return self.job_id
+
+    def on_kill(self) -> None:
+        """Cancel the submitted job run"""
+        if self.job_id:
+            self.log.info("Stopping job run with jobId - %s", self.job_id)
+            response = self.hook.stop_query(self.job_id)
+            http_status_code = None
+            try:
+                http_status_code = response["ResponseMetadata"]["HTTPStatusCode"]
+            except Exception as ex:  # pylint: disable=broad-except

Review comment:
       Should this be swallowed or rethrown after logging?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667310670



##########
File path: docs/apache-airflow-providers-amazon/operators/emr_eks.rst
##########
@@ -0,0 +1,87 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+.. _howto/operator:EMRContainersOperators:
+
+Amazon EMR on EKS Operators
+===========================
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Overview
+--------
+
+Airflow to Amazon EMR on Amazon EKS integration provides a way to run Apache Spark jobs on Kubernetes.
+
+- :class:`~airflow.providers.amazon.aws.operators.emr_containers.EMRContainerOperator`
+
+There is an example dag that shows how to run a job with this operator.
+
+- example_emr_eks_job.py
+
+Create EMR on EKS job with sample script
+----------------------------------------
+
+Purpose
+"""""""
+
+This example dag ``example_emr_eks_job.py`` uses ``EMRContainerOperator`` to create a new EMR on EKS job calculating the mathematical constant ``Pi``, and monitors the progress
+with ``EMRContainerSensor``.
+
+This example assumes that you already have an EMR on EKS virtual cluster configured. See the `EMR on EKS Getting Started guide <https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/getting-started.html>`__ for more information.
+
+Environment variables
+"""""""""""""""""""""
+
+This example relies on the following variables, which can be passed via OS environment variables.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
+    :language: python
+    :start-after: [START howto_operator_emr_eks_env_variables]
+    :end-before: [END howto_operator_emr_eks_env_variables]

Review comment:
       ```suggestion
   ```
   No value for end user. The reader should learn how to use an operator, not this specific example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667308523



##########
File path: airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag for an Amazon EMR on EKS Spark job.
+"""
+import os
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.emr_containers import EMRContainerOperator
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_emr_eks_env_variables]
+VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "test-cluster")
+JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::012345678912:role/emr_eks_default_role")
+# [END howto_operator_emr_eks_env_variables]
+
+
+# [START howto_operator_emr_eks_config]
+JOB_DRIVER_ARG = {
+    "sparkSubmitJobDriver": {
+        "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
+        "sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1",  # noqa: E501
+    }
+}
+
+CONFIGURATION_OVERRIDES_ARG = {
+    "monitoringConfiguration": {
+        "cloudWatchMonitoringConfiguration": {
+            "logGroupName": "/aws/emr-eks-spark",
+            "logStreamNamePrefix": "airflow",
+        }
+    }
+}
+# [END howto_operator_emr_eks_config]
+
+with DAG(
+    dag_id='emr_eks_pi_job',
+    dagrun_timeout=timedelta(hours=2),
+    start_date=days_ago(1),
+    schedule_interval="@once",
+    tags=["emr_containers", "example"],
+) as dag:
+
+    # An example of how to get the cluster id and arn from an Airflow connection
+    # c = BaseHook.get_connection("emr_eks")

Review comment:
       What is the purpose of this comment? If you are suggesting here to add this code to the DAG then it is not a good idea as it means that this query will be executed every time the file is loaded. We should use jinja macro instead. See: https://github.com/apache/airflow/issues/14597




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r668252966



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')

Review comment:
       I have no opinion on this matter. You can delete it if you want because then the user can use defaults_args to limit duplications.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r666336230



##########
File path: airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag for an Amazon EMR on EKS Spark job.
+"""
+import os
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.emr_containers import EMRContainerOperator
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_emr_eks_env_variables]
+VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "test-cluster")
+JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::012345678912:role/emr_eks_default_role")
+# [END howto_operator_emr_eks_env_variables]
+
+DEFAULT_ARGS = {
+    "owner": "airflow",
+    "depends_on_past": False,
+    "email": ["test@example.com"],
+    "email_on_failure": False,
+    "email_on_retry": False,
+}

Review comment:
       @ashb Thanks for the additional review! I'll go ahead and get those changes addressed today.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r666054773



##########
File path: airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag for an Amazon EMR on EKS Spark job.
+"""
+import os
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.emr_containers import EMRContainerOperator
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_emr_eks_env_variables]
+VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "test-cluster")
+JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::012345678912:role/emr_eks_default_role")
+# [END howto_operator_emr_eks_env_variables]
+
+DEFAULT_ARGS = {
+    "owner": "airflow",
+    "depends_on_past": False,
+    "email": ["test@example.com"],
+    "email_on_failure": False,
+    "email_on_retry": False,
+}

Review comment:
       We're in the process of updating examples to not use default args as most of them don't need it, so
   
   ```suggestion
   ```

##########
File path: airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag for an Amazon EMR on EKS Spark job.
+"""
+import os
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.emr_containers import EMRContainerOperator
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_emr_eks_env_variables]
+VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "test-cluster")
+JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::012345678912:role/emr_eks_default_role")
+# [END howto_operator_emr_eks_env_variables]
+
+DEFAULT_ARGS = {
+    "owner": "airflow",
+    "depends_on_past": False,
+    "email": ["test@example.com"],
+    "email_on_failure": False,
+    "email_on_retry": False,
+}
+
+# [START howto_operator_emr_eks_config]
+JOB_DRIVER_ARG = {
+    "sparkSubmitJobDriver": {
+        "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
+        "sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1",  # noqa: E501
+    }
+}
+
+CONFIGURATION_OVERRIDES_ARG = {
+    "monitoringConfiguration": {
+        "cloudWatchMonitoringConfiguration": {
+            "logGroupName": "/aws/emr-eks-spark",
+            "logStreamNamePrefix": "airflow",
+        }
+    }
+}
+# [END howto_operator_emr_eks_config]
+
+with DAG(
+    dag_id='emr_eks_pi_job',
+    default_args=DEFAULT_ARGS,

Review comment:
       ```suggestion
   ```

##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,

Review comment:
       Also these aren't optional -- that would mean passing `None` is valid (`Optional[int]` == `Union[int, None]`)

##########
File path: airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag for an Amazon EMR on EKS Spark job.
+"""
+import os
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.emr_containers import EMRContainerOperator
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_emr_eks_env_variables]
+VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "test-cluster")
+JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::012345678912:role/emr_eks_default_role")
+# [END howto_operator_emr_eks_env_variables]
+
+DEFAULT_ARGS = {
+    "owner": "airflow",
+    "depends_on_past": False,
+    "email": ["test@example.com"],
+    "email_on_failure": False,
+    "email_on_retry": False,
+}
+
+# [START howto_operator_emr_eks_config]
+JOB_DRIVER_ARG = {
+    "sparkSubmitJobDriver": {
+        "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
+        "sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1",  # noqa: E501
+    }
+}
+
+CONFIGURATION_OVERRIDES_ARG = {
+    "monitoringConfiguration": {
+        "cloudWatchMonitoringConfiguration": {
+            "logGroupName": "/aws/emr-eks-spark",
+            "logStreamNamePrefix": "airflow",
+        }
+    }
+}
+# [END howto_operator_emr_eks_config]
+
+with DAG(
+    dag_id='emr_eks_pi_job',
+    default_args=DEFAULT_ARGS,
+    dagrun_timeout=timedelta(hours=2),
+    start_date=days_ago(1),
+    schedule_interval="@once",
+    tags=["emr_containers"],

Review comment:
       We include example tag on all example DAGs I think?
   
   ```suggestion
       tags=["emr_containers", "example"],
   ```

##########
File path: airflow/providers/amazon/aws/operators/emr_containers.py
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+from uuid import uuid4
+
+from airflow.exceptions import AirflowException
+
+try:
+    from functools import cached_property
+except ImportError:
+    from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class EMRContainerOperator(BaseOperator):
+    """
+    An operator that submits jobs to EMR on EKS virtual clusters.
+
+    :param name: The name of the job run.
+    :type name: str
+    :param virtual_cluster_id: The EMR on EKS virtual cluster ID
+    :type virtual_cluster_id: str
+    :param execution_role_arn: The IAM role ARN associated with the job run.
+    :type execution_role_arn: str
+    :param release_label: The Amazon EMR release version to use for the job run.
+    :type release_label: str
+    :param job_driver: Job configuration details, e.g. the Spark job parameters.
+    :type job_driver: dict
+    :param configuration_overrides: The configuration overrides for the job run,
+        specifically either application configuration or monitoring configuration.
+    :type configuration_overrides: dict
+    :param client_request_token: The client idempotency token of the job run request.
+        Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        If no token is provided, a UUIDv4 token will be generated for you.
+    :type client_request_token: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :type aws_conn_id: str
+    :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on EMR
+    :type sleep_time: int
+    :param max_tries: Maximum number of times to wait for the job run to finish.
+        Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state.
+    :type max_tries: int
+    """
+
+    template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
+    ui_color = "#f9c915"
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        *,
+        name: str,
+        virtual_cluster_id: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+        aws_conn_id: str = "aws_default",
+        sleep_time: int = 30,
+        max_tries: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.name = name
+        self.virtual_cluster_id = virtual_cluster_id
+        self.execution_role_arn = execution_role_arn
+        self.release_label = release_label
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides or {}
+        self.aws_conn_id = aws_conn_id
+        self.client_request_token = client_request_token or str(uuid4())
+        self.sleep_time = sleep_time
+        self.max_tries = max_tries
+        self.job_id = None
+
+    @cached_property
+    def hook(self) -> EMRContainerHook:
+        """Create and return an EMRContainerHook."""
+        return EMRContainerHook(
+            self.aws_conn_id,
+            virtual_cluster_id=self.virtual_cluster_id,
+        )
+
+    def execute(self, context: dict) -> Optional[str]:
+        """Run job on EMR Containers"""
+        self.job_id = self.hook.submit_job(
+            self.name,
+            self.execution_role_arn,
+            self.release_label,
+            self.job_driver,
+            self.configuration_overrides,
+            self.client_request_token,
+        )
+        query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.sleep_time)
+
+        if query_status in EMRContainerHook.FAILURE_STATES:
+            # self.hook.get_state_change_reason(self.query_execution_id)
+            error_message = "BEEP BOOP"
+            raise AirflowException(
+                "EMR Containers job failed. Final state is {}, query_execution_id is {}. Error: {}".format(
+                    query_status, self.job_id, error_message
+                )
+            )
+        elif not query_status or query_status in EMRContainerHook.INTERMEDIATE_STATES:
+            raise AirflowException(
+                "Final state of EMR Containers job is {}. "
+                "Max tries of poll status exceeded, query_execution_id is {}.".format(
+                    query_status, self.job_id
+                )
+            )
+
+        return self.job_id
+
+    def on_kill(self) -> None:
+        """Cancel the submitted job run"""
+        if self.job_id:
+            self.log.info("Stopping job run with jobId - %s", self.job_id)
+            response = self.hook.stop_query(self.job_id)
+            http_status_code = None
+            try:
+                http_status_code = response["ResponseMetadata"]["HTTPStatusCode"]
+            except Exception as ex:  # pylint: disable=broad-except

Review comment:
       ```suggestion
               except Exception as ex:
   ```
   
   Pylint has been removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r665519796



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,189 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.get_conn().start_job_run(**params)

Review comment:
       Here and elsewhere:  `self.get_conn()` is being deprecated.  Will this work with the preferred `self.conn`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-885272202


   We currently have one operator that allows us to run Spark job on Kubernetes. It works with both EKS and GCP as well as any other Kubernetes platform. - [SparkKubernetesOperator](https://github.com/apache/airflow/blob/d72b363929c86eb03fc9583002459bd10bc7eaeb/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py#L24). Why would anyone use this operator instead of the generic operator for Kubernetes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r667308764



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)

Review comment:
       Can you tell me a little more about it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dacort commented on pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
dacort commented on pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#issuecomment-872603009


   Digging into the failures. 🙃 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16766: Add an Amazon EMR on EKS provider package

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16766:
URL: https://github.com/apache/airflow/pull/16766#discussion_r668251931



##########
File path: airflow/providers/amazon/aws/hooks/emr_containers.py
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from typing import Any, Dict, Optional
+
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EMRContainerHook(AwsBaseHook):
+    """
+    Interact with AWS EMR Virtual Cluster to run, poll jobs and return job status
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster
+    :type virtual_cluster_id: str
+    """
+
+    INTERMEDIATE_STATES = (
+        "PENDING",
+        "SUBMITTED",
+        "RUNNING",
+    )
+    FAILURE_STATES = (
+        "FAILED",
+        "CANCELLED",
+        "CANCEL_PENDING",
+    )
+    SUCCESS_STATES = ("COMPLETED",)
+
+    def __init__(self, *args: Any, virtual_cluster_id: Optional[str] = None, **kwargs: Any) -> None:
+        super().__init__(client_type="emr-containers", *args, **kwargs)  # type: ignore
+        self.virtual_cluster_id = self._get_virtual_cluster_id(virtual_cluster_id, self.aws_conn_id)
+
+    def _get_virtual_cluster_id(self, virtual_cluster_id: str, aws_conn_id: str):
+        if virtual_cluster_id is not None:
+            return virtual_cluster_id
+
+        if aws_conn_id is not None:
+            conn = self.get_connection(aws_conn_id)
+            cluster_id = conn.extra_dejson.get('virtual_cluster_id')
+            if cluster_id:
+                return cluster_id
+            else:
+                raise AirflowException("Missing virtual_cluster_id in AWS connection")
+
+        raise AirflowException(
+            f"Cannot get EMR virtual cluster ID: Please pass `virtual_cluster_id` or set it in connection JSON: {aws_conn_id}"  # noqa: E501
+        )
+
+    def submit_job(
+        self,
+        name: str,
+        execution_role_arn: str,
+        release_label: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict] = None,
+        client_request_token: Optional[str] = None,
+    ) -> str:
+        """
+        Submit a job to the EMR Containers API and and return the job ID.
+        A job run is a unit of work, such as a Spark jar, PySpark script,
+        or SparkSQL query, that you submit to Amazon EMR on EKS.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.start_job_run  # noqa: E501
+
+        :param name: The name of the job run.
+        :type name: str
+        :param execution_role_arn: The IAM role ARN associated with the job run.
+        :type execution_role_arn: str
+        :param release_label: The Amazon EMR release version to use for the job run.
+        :type release_label: str
+        :param job_driver: Job configuration details, e.g. the Spark job parameters.
+        :type job_driver: dict
+        :param configuration_overrides: The configuration overrides for the job run,
+            specifically either application configuration or monitoring configuration.
+        :type configuration_overrides: dict
+        :param client_request_token: The client idempotency token of the job run request.
+            Use this if you want to specify a unique ID to prevent two jobs from getting started.
+        :type client_request_token: str
+        :return: Job ID
+        """
+        params = {
+            "name": name,
+            "virtualClusterId": self.virtual_cluster_id,
+            "executionRoleArn": execution_role_arn,
+            "releaseLabel": release_label,
+            "jobDriver": job_driver,
+            "configurationOverrides": configuration_overrides or {},
+        }
+        if client_request_token:
+            params["clientToken"] = client_request_token
+
+        response = self.conn.start_job_run(**params)
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Start Job Run failed: {response}')
+        else:
+            self.log.info(
+                f"Start Job Run success - Job Id {response['id']} and virtual cluster id {response['virtualClusterId']}"  # noqa: E501
+            )
+            return response['id']
+
+    def get_job_failure_reason(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the reason for a job failure (e.g. error message). Returns None or reason string.
+
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        # We absorb any errors if we can't retrieve the job status
+        reason = None
+
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            reason = response['jobRun']['failureReason']
+        except KeyError:
+            self.log.error('Could not get status of the EMR on EKS job')
+        except ClientError as ex:
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+
+        return reason
+
+    def check_query_status(self, job_id: str) -> Optional[str]:
+        """
+        Fetch the status of submitted job run. Returns None or one of valid query states.
+        See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html#EMRContainers.Client.describe_job_run  # noqa: E501
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :return: str
+        """
+        try:
+            response = self.conn.describe_job_run(
+                virtualClusterId=self.virtual_cluster_id,
+                id=job_id,
+            )
+            return response["jobRun"]["state"]
+        except self.conn.exceptions.ResourceNotFoundException:
+            # If the job is not found, we raise an exception as something fatal has happened.
+            raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
+        except ClientError as ex:
+            # If we receive a generic ClientError, we swallow the exception so that the
+            self.log.error('AWS request failed, check logs for more info: %s', ex)
+            return None
+
+    def poll_query_status(
+        self, job_id: str, max_tries: Optional[int] = None, poll_interval: int = 30
+    ) -> Optional[str]:
+        """
+        Poll the status of submitted job run until query state reaches final state.
+        Returns one of the final states.
+        :param job_id: Id of submitted job run
+        :type job_id: str
+        :param max_tries: Number of times to poll for query state before function exits
+        :type max_tries: int
+        :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
+        :type poll_interval: int
+        :return: str
+        """
+        try_number = 1
+        final_query_state = None  # Query state when query reaches final state or max_tries reached
+
+        # TODO: Make this logic a little bit more robust.
+        # Currently this polls until the state is *not* one of the INTERMEDIATE_STATES
+        # While that should work in most cases...it might not. :)
+        while True:
+            query_state = self.check_query_status(job_id)
+            if query_state is None:
+                self.log.info(f"Try {try_number}: Invalid query state. Retrying again")
+            elif query_state in self.INTERMEDIATE_STATES:
+                self.log.info(f"Try {try_number}: Query is still in an intermediate state - {query_state}")
+            else:
+                self.log.info(f"Try {try_number}: Query execution completed. Final state is {query_state}")

Review comment:
       This is not a question of performance, but mostly a question of usability.
   
   If you send data to the logger as text template + arguments, some services e.g. Logentries, Logstash, Stackdriver can store data in two separate columns i.e. unrrendered.  You can use it to filter the occurrence of a given log event, regardless of what the arguments are. 
   
   This also gives a clearer result when displaying text on consoles. All arguments are bolded.
   <img width="504" alt="Screenshot 2021-07-12 at 22 48 10" src="https://user-images.githubusercontent.com/12058428/125353768-442d0280-e363-11eb-8ff2-646e9bef471f.png">
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org