You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/09 14:31:24 UTC

[GitHub] [airflow] nikhi-suthar opened a new pull request, #26269: Added Feature - Glue job continuously log printing in Airflow task logs

nikhi-suthar opened a new pull request, #26269:
URL: https://github.com/apache/airflow/pull/26269

   Added Continuously logging feature for Glue jobs and fixed  existing bug for not printing Cloduwatch logs.
   
   Following are the changes done:
   
   - **GlueJobOperator**
        - Added new  input parameters name as **`continuous_logging`** for Continuous logging. Default value is False
   
   - **GlueJobHooks**
       - Added new  input parameters name as **`continuous_logging`** for Continuous logging in the method **`job_completion`**. 
       - Added the logic to print available logs from **Cloudwatch** to Airflow logs.
       - Added new method **`print_output_logs`** for handling continuously logging. 
       - Fixed the existing bug for not executing method **`print_jobs_logs`**
       - Change the method for printing selective logs from  filter mechanism to **get log event** method.
       - Added Condition for **Batch** , **Continue** and **no logging** in **`job_completion`** method
   
   
   
   
   #26203
   https://github.com/apache/airflow/issues/26196


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r970083118


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
         log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
         log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
         log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
         try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
-            else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response['events']):
+            self.log.info('Glue Job Run Logs')
+            response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)

Review Comment:
   Why the change from filter_log_events to get_log_events?   If you're going to do that, then also drop the filter constants on L33-34



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r970098750


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
         log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
         log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
         log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
         try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
-            else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response['events']):
+            self.log.info('Glue Job Run Logs')
+            response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            while len(response['events']) > 0:
+                next_token = response["nextForwardToken"]
                 messages = '\t'.join([event['message'] for event in response['events']])
-                self.log.info('Glue Job Run Logs:\n\t%s', messages)
-
+                self.log.info('\n\t%s', messages)
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            return None
         except log_client.exceptions.ResourceNotFoundException:
             self.log.warning(
                 'No new Glue driver logs found. This might be because there are no new logs, '
                 'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
                 f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
             )
+        return None
 
-        # If no new log events are available, filter_log_events will return None.
-        # In that case, check the same token again next pass.
-        return response.get('nextToken') or next_token
+    def print_output_logs(
+        self, job_name: str, run_id: str, log_group_name: str, log_client, next_token=None
+    ) -> Optional[str]:
+        """Prints Glue cloudwatch logs continuously to the Airflow task log and returns nextToken."""
+        log_group_name = log_group_name + '/' + DEFAULT_LOG_SUFFIX
+        try:
+            if next_token is None:
+                response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            else:
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            if len(response['events']) > 0:
+                for event in response['events']:
+                    self.log.info("[%s] %s", job_name, event['message'])
+                return response["nextForwardToken"]
+            else:
+                return None
+        except log_client.exceptions.ResourceNotFoundException:
+            self.log.warning("Waiting for the Glue Job output log stream %s/%s", log_group_name, run_id)
+            return None
+        except Exception as E:
+            self.log.warning(str(E))
+            return None
 
-    def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
+    def job_completion(
+        self, job_name: str, run_id: str, verbose: bool = False, continuous_logging: bool = False
+    ) -> Dict[str, str]:
         """
         Waits until Glue job with job_name completes or
         fails and return final state if finished.
         Raises AirflowException when the job failed
         :param job_name: unique job name per AWS account
         :param run_id: The job-run ID of the predecessor job run
-        :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs.  (default: False)
-        :return: Dict of JobRunState and JobRunId
+        :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs after completion of the jobs.
+        (default: False)
+        :param continuous_logging: If True (qnd verbose also True), then print Glue job output cloudwatch logs
+        continuously (once log stream available) in the Airflow Logs.(default: False)
+        :return: Dict of JobRunState and JobRunId (after completion) otherwise failure exception.
         """
+        region_name = self.conn_region_name
+        self.log.info("region name - %s", region_name)
+        credentials = self.get_credentials(region_name=region_name)
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
+        log_client = boto3.client(
+            'logs',
+            region_name=region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
+        glue_client = self.get_conn()
         next_log_token = None
+        mode = None
         job_failed = False
+        if verbose and continuous_logging:
+            mode = 'CONTINUE'
+            self.log.info("Continuous logging mode is enable.")
+        elif verbose:
+            mode = 'BATCH'
+            self.log.info("Batch logging mode is enable.")
+        else:
+            self.log.info("Logging is disable.")
 
         while True:
             try:
-                job_run_state = self.get_job_state(job_name, run_id)
-                if job_run_state in finished_states:
-                    self.log.info('Exiting Job %s Run State: %s', run_id, job_run_state)
-                    return {'JobRunState': job_run_state, 'JobRunId': run_id}
-                if job_run_state in failed_states:
-                    job_failed = True
-                    job_error_message = f'Exiting Job {run_id} Run State: {job_run_state}'
-                    self.log.info(job_error_message)
-                    raise AirflowException(job_error_message)
-                else:
-                    self.log.info(
-                        'Polling for AWS Glue Job %s current run state with status %s',
-                        job_name,
-                        job_run_state,
-                    )
-                    time.sleep(self.JOB_POLL_INTERVAL)
-            finally:
-                if verbose:
-                    next_log_token = self.print_job_logs(
+                job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
+                job_run_state = job_run['JobRun']['JobRunState']
+                msg = f'[Batch] The Job {job_name} with run id {run_id} is : {job_run_state}'
+                if mode == 'CONTINUE':
+                    log_group_name = job_run['JobRun']['LogGroupName']
+                    token = self.print_output_logs(
                         job_name=job_name,
                         run_id=run_id,
-                        job_failed=job_failed,
+                        log_group_name=log_group_name,
+                        log_client=log_client,
                         next_token=next_log_token,
                     )
+                    while token is not None:
+                        next_log_token = token
+                        token = self.print_output_logs(
+                            job_name=job_name,
+                            run_id=run_id,
+                            log_group_name=log_group_name,
+                            log_client=log_client,
+                            next_token=next_log_token,
+                        )
+                elif mode == 'BATCH':
+                    msg = f"The Job {job_name} with run id {run_id} is : {job_run_state}"

Review Comment:
   Is this not basically the same value as it is already assigned on L254?   Do we need this assignment here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Dearkano commented on pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by "Dearkano (via GitHub)" <gi...@apache.org>.
Dearkano commented on PR #26269:
URL: https://github.com/apache/airflow/pull/26269#issuecomment-1414537107

   Hi is there any update on this PR, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] shubham22 commented on pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
shubham22 commented on PR #26269:
URL: https://github.com/apache/airflow/pull/26269#issuecomment-1330028294

   @nikhi-suthar - checking once more, are you still working on this? cc: @o-nikolas 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nikhi-suthar commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r979202278


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   > 
   
   I tried that but the issue is, in Glue connector, we are already using one client for Glue from AWSBaseHook object, so for cloudwatch I really do not want to create one more object of AWSBaseHook t get the boto3 client that's a reason I have added it directly through boto3. Correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #26269:
URL: https://github.com/apache/airflow/pull/26269#issuecomment-1382598900

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nikhi-suthar commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r980232588


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   I have taken care of Log client from AWSBaseHook. It is working fine now. Please review now and let me know any further changes require.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r970089160


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
         log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
         log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
         log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
         try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
-            else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response['events']):
+            self.log.info('Glue Job Run Logs')
+            response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            while len(response['events']) > 0:
+                next_token = response["nextForwardToken"]
                 messages = '\t'.join([event['message'] for event in response['events']])
-                self.log.info('Glue Job Run Logs:\n\t%s', messages)
-
+                self.log.info('\n\t%s', messages)
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            return None
         except log_client.exceptions.ResourceNotFoundException:
             self.log.warning(
                 'No new Glue driver logs found. This might be because there are no new logs, '
                 'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
                 f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
             )
+        return None
 
-        # If no new log events are available, filter_log_events will return None.
-        # In that case, check the same token again next pass.
-        return response.get('nextToken') or next_token
+    def print_output_logs(
+        self, job_name: str, run_id: str, log_group_name: str, log_client, next_token=None
+    ) -> Optional[str]:
+        """Prints Glue cloudwatch logs continuously to the Airflow task log and returns nextToken."""
+        log_group_name = log_group_name + '/' + DEFAULT_LOG_SUFFIX
+        try:
+            if next_token is None:
+                response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            else:
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            if len(response['events']) > 0:
+                for event in response['events']:
+                    self.log.info("[%s] %s", job_name, event['message'])
+                return response["nextForwardToken"]
+            else:
+                return None
+        except log_client.exceptions.ResourceNotFoundException:
+            self.log.warning("Waiting for the Glue Job output log stream %s/%s", log_group_name, run_id)
+            return None
+        except Exception as E:
+            self.log.warning(str(E))
+            return None
 
-    def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
+    def job_completion(
+        self, job_name: str, run_id: str, verbose: bool = False, continuous_logging: bool = False
+    ) -> Dict[str, str]:
         """
         Waits until Glue job with job_name completes or
         fails and return final state if finished.
         Raises AirflowException when the job failed
         :param job_name: unique job name per AWS account
         :param run_id: The job-run ID of the predecessor job run
-        :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs.  (default: False)
-        :return: Dict of JobRunState and JobRunId
+        :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs after completion of the jobs.
+        (default: False)
+        :param continuous_logging: If True (qnd verbose also True), then print Glue job output cloudwatch logs
+        continuously (once log stream available) in the Airflow Logs.(default: False)
+        :return: Dict of JobRunState and JobRunId (after completion) otherwise failure exception.
         """
+        region_name = self.conn_region_name
+        self.log.info("region name - %s", region_name)
+        credentials = self.get_credentials(region_name=region_name)
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
+        log_client = boto3.client(
+            'logs',
+            region_name=region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
+        glue_client = self.get_conn()
         next_log_token = None
+        mode = None
         job_failed = False
+        if verbose and continuous_logging:
+            mode = 'CONTINUE'
+            self.log.info("Continuous logging mode is enable.")
+        elif verbose:
+            mode = 'BATCH'
+            self.log.info("Batch logging mode is enable.")
+        else:
+            self.log.info("Logging is disable.")
 
         while True:
             try:
-                job_run_state = self.get_job_state(job_name, run_id)
-                if job_run_state in finished_states:
-                    self.log.info('Exiting Job %s Run State: %s', run_id, job_run_state)
-                    return {'JobRunState': job_run_state, 'JobRunId': run_id}
-                if job_run_state in failed_states:
-                    job_failed = True
-                    job_error_message = f'Exiting Job {run_id} Run State: {job_run_state}'
-                    self.log.info(job_error_message)
-                    raise AirflowException(job_error_message)
-                else:
-                    self.log.info(
-                        'Polling for AWS Glue Job %s current run state with status %s',
-                        job_name,
-                        job_run_state,
-                    )
-                    time.sleep(self.JOB_POLL_INTERVAL)
-            finally:
-                if verbose:
-                    next_log_token = self.print_job_logs(
+                job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
+                job_run_state = job_run['JobRun']['JobRunState']
+                msg = f'[Batch] The Job {job_name} with run id {run_id} is : {job_run_state}'
+                if mode == 'CONTINUE':
+                    log_group_name = job_run['JobRun']['LogGroupName']
+                    token = self.print_output_logs(
                         job_name=job_name,
                         run_id=run_id,
-                        job_failed=job_failed,
+                        log_group_name=log_group_name,
+                        log_client=log_client,
                         next_token=next_log_token,
                     )
+                    while token is not None:
+                        next_log_token = token
+                        token = self.print_output_logs(
+                            job_name=job_name,
+                            run_id=run_id,
+                            log_group_name=log_group_name,
+                            log_client=log_client,
+                            next_token=next_log_token,
+                        )
+                elif mode == 'BATCH':
+                    msg = f"The Job {job_name} with run id {run_id} is : {job_run_state}"
+                    self.log.info(msg)
+                else:
+                    self.log.info(msg)

Review Comment:
   I know it's silly, but can you humour me and add a blank line here between the `if` blocks to visually separate them?
   ```suggestion
                       self.log.info(msg)
                       
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26269:
URL: https://github.com/apache/airflow/pull/26269#issuecomment-1264537193

   For some reason the future.__annotations__ import is duplicated ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r969491629


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   BTW, in your case you do not provide `aws_session_token` into the client. If user follow aws best practices and use temporary credentials obtain by role_arn, EC2 Instance IAM role, ECS Task IAM Role, IRSA EKS and etc. then token should be provided otherwise error will raised.
   
   I thought token not required only for permanent credentials



##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
         log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
         log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
         log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
         try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
-            else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response['events']):
+            self.log.info('Glue Job Run Logs')
+            response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            while len(response['events']) > 0:
+                next_token = response["nextForwardToken"]
                 messages = '\t'.join([event['message'] for event in response['events']])
-                self.log.info('Glue Job Run Logs:\n\t%s', messages)
-
+                self.log.info('\n\t%s', messages)
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            return None
         except log_client.exceptions.ResourceNotFoundException:
             self.log.warning(
                 'No new Glue driver logs found. This might be because there are no new logs, '
                 'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
                 f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
             )
+        return None
 
-        # If no new log events are available, filter_log_events will return None.
-        # In that case, check the same token again next pass.
-        return response.get('nextToken') or next_token
+    def print_output_logs(
+        self, job_name: str, run_id: str, log_group_name: str, log_client, next_token=None
+    ) -> Optional[str]:
+        """Prints Glue cloudwatch logs continuously to the Airflow task log and returns nextToken."""
+        log_group_name = log_group_name + '/' + DEFAULT_LOG_SUFFIX
+        try:
+            if next_token is None:
+                response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            else:
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            if len(response['events']) > 0:
+                for event in response['events']:
+                    self.log.info("[%s] %s", job_name, event['message'])
+                return response["nextForwardToken"]
+            else:
+                return None
+        except log_client.exceptions.ResourceNotFoundException:
+            self.log.warning("Waiting for the Glue Job output log stream %s/%s", log_group_name, run_id)
+            return None
+        except Exception as E:
+            self.log.warning(str(E))
+            return None
 
-    def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
+    def job_completion(
+        self, job_name: str, run_id: str, verbose: bool = False, continuous_logging: bool = False
+    ) -> Dict[str, str]:
         """
         Waits until Glue job with job_name completes or
         fails and return final state if finished.
         Raises AirflowException when the job failed
         :param job_name: unique job name per AWS account
         :param run_id: The job-run ID of the predecessor job run
-        :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs.  (default: False)
-        :return: Dict of JobRunState and JobRunId
+        :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs after completion of the jobs.
+        (default: False)
+        :param continuous_logging: If True (qnd verbose also True), then print Glue job output cloudwatch logs
+        continuously (once log stream available) in the Airflow Logs.(default: False)
+        :return: Dict of JobRunState and JobRunId (after completion) otherwise failure exception.
         """
+        region_name = self.conn_region_name
+        self.log.info("region name - %s", region_name)
+        credentials = self.get_credentials(region_name=region_name)
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
+        log_client = boto3.client(
+            'logs',
+            region_name=region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   Same as above



##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   Might be rather than directly use `boto3` use AwsBaseHook.get_session method instead, so it could be replaced by
   
   ```python
   log_client = self.get_session(region_name=self.conn_region_name).client('logs')
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nikhi-suthar commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r979208718


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
         log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
         log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
         log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
         try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
-            else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response['events']):
+            self.log.info('Glue Job Run Logs')
+            response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            while len(response['events']) > 0:
+                next_token = response["nextForwardToken"]
                 messages = '\t'.join([event['message'] for event in response['events']])
-                self.log.info('Glue Job Run Logs:\n\t%s', messages)
-
+                self.log.info('\n\t%s', messages)
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            return None
         except log_client.exceptions.ResourceNotFoundException:
             self.log.warning(
                 'No new Glue driver logs found. This might be because there are no new logs, '
                 'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
                 f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
             )
+        return None
 
-        # If no new log events are available, filter_log_events will return None.
-        # In that case, check the same token again next pass.
-        return response.get('nextToken') or next_token
+    def print_output_logs(
+        self, job_name: str, run_id: str, log_group_name: str, log_client, next_token=None
+    ) -> Optional[str]:
+        """Prints Glue cloudwatch logs continuously to the Airflow task log and returns nextToken."""
+        log_group_name = log_group_name + '/' + DEFAULT_LOG_SUFFIX
+        try:
+            if next_token is None:
+                response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            else:
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            if len(response['events']) > 0:
+                for event in response['events']:
+                    self.log.info("[%s] %s", job_name, event['message'])
+                return response["nextForwardToken"]
+            else:
+                return None
+        except log_client.exceptions.ResourceNotFoundException:
+            self.log.warning("Waiting for the Glue Job output log stream %s/%s", log_group_name, run_id)
+            return None
+        except Exception as E:
+            self.log.warning(str(E))
+            return None
 
-    def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
+    def job_completion(
+        self, job_name: str, run_id: str, verbose: bool = False, continuous_logging: bool = False
+    ) -> Dict[str, str]:
         """
         Waits until Glue job with job_name completes or
         fails and return final state if finished.
         Raises AirflowException when the job failed
         :param job_name: unique job name per AWS account
         :param run_id: The job-run ID of the predecessor job run
-        :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs.  (default: False)
-        :return: Dict of JobRunState and JobRunId
+        :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs after completion of the jobs.
+        (default: False)
+        :param continuous_logging: If True (qnd verbose also True), then print Glue job output cloudwatch logs
+        continuously (once log stream available) in the Airflow Logs.(default: False)
+        :return: Dict of JobRunState and JobRunId (after completion) otherwise failure exception.
         """
+        region_name = self.conn_region_name
+        self.log.info("region name - %s", region_name)
+        credentials = self.get_credentials(region_name=region_name)
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
+        log_client = boto3.client(
+            'logs',
+            region_name=region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
+        glue_client = self.get_conn()
         next_log_token = None
+        mode = None
         job_failed = False
+        if verbose and continuous_logging:
+            mode = 'CONTINUE'
+            self.log.info("Continuous logging mode is enable.")
+        elif verbose:
+            mode = 'BATCH'
+            self.log.info("Batch logging mode is enable.")
+        else:
+            self.log.info("Logging is disable.")
 
         while True:
             try:
-                job_run_state = self.get_job_state(job_name, run_id)
-                if job_run_state in finished_states:
-                    self.log.info('Exiting Job %s Run State: %s', run_id, job_run_state)
-                    return {'JobRunState': job_run_state, 'JobRunId': run_id}
-                if job_run_state in failed_states:
-                    job_failed = True
-                    job_error_message = f'Exiting Job {run_id} Run State: {job_run_state}'
-                    self.log.info(job_error_message)
-                    raise AirflowException(job_error_message)
-                else:
-                    self.log.info(
-                        'Polling for AWS Glue Job %s current run state with status %s',
-                        job_name,
-                        job_run_state,
-                    )
-                    time.sleep(self.JOB_POLL_INTERVAL)
-            finally:
-                if verbose:
-                    next_log_token = self.print_job_logs(
+                job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
+                job_run_state = job_run['JobRun']['JobRunState']
+                msg = f'[Batch] The Job {job_name} with run id {run_id} is : {job_run_state}'
+                if mode == 'CONTINUE':
+                    log_group_name = job_run['JobRun']['LogGroupName']
+                    token = self.print_output_logs(
                         job_name=job_name,
                         run_id=run_id,
-                        job_failed=job_failed,
+                        log_group_name=log_group_name,
+                        log_client=log_client,
                         next_token=next_log_token,
                     )
+                    while token is not None:
+                        next_log_token = token
+                        token = self.print_output_logs(
+                            job_name=job_name,
+                            run_id=run_id,
+                            log_group_name=log_group_name,
+                            log_client=log_client,
+                            next_token=next_log_token,
+                        )
+                elif mode == 'BATCH':
+                    msg = f"The Job {job_name} with run id {run_id} is : {job_run_state}"
+                    self.log.info(msg)
+                else:
+                    self.log.info(msg)

Review Comment:
   Sure will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on PR #26269:
URL: https://github.com/apache/airflow/pull/26269#issuecomment-1281560737

   @nikhi-suthar 
   Are you still attending this PR? It looks like the glue tests are timing out after your changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r970086070


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
         log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
         log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
         log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
         try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
-            else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response['events']):
+            self.log.info('Glue Job Run Logs')
+            response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            while len(response['events']) > 0:
+                next_token = response["nextForwardToken"]
                 messages = '\t'.join([event['message'] for event in response['events']])
-                self.log.info('Glue Job Run Logs:\n\t%s', messages)
-
+                self.log.info('\n\t%s', messages)
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            return None
         except log_client.exceptions.ResourceNotFoundException:
             self.log.warning(
                 'No new Glue driver logs found. This might be because there are no new logs, '
                 'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
                 f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
             )
+        return None
 
-        # If no new log events are available, filter_log_events will return None.
-        # In that case, check the same token again next pass.
-        return response.get('nextToken') or next_token
+    def print_output_logs(
+        self, job_name: str, run_id: str, log_group_name: str, log_client, next_token=None
+    ) -> Optional[str]:
+        """Prints Glue cloudwatch logs continuously to the Airflow task log and returns nextToken."""
+        log_group_name = log_group_name + '/' + DEFAULT_LOG_SUFFIX
+        try:
+            if next_token is None:
+                response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            else:
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            if len(response['events']) > 0:
+                for event in response['events']:
+                    self.log.info("[%s] %s", job_name, event['message'])
+                return response["nextForwardToken"]
+            else:
+                return None
+        except log_client.exceptions.ResourceNotFoundException:
+            self.log.warning("Waiting for the Glue Job output log stream %s/%s", log_group_name, run_id)
+            return None
+        except Exception as E:
+            self.log.warning(str(E))
+            return None
 
-    def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
+    def job_completion(
+        self, job_name: str, run_id: str, verbose: bool = False, continuous_logging: bool = False
+    ) -> Dict[str, str]:
         """
         Waits until Glue job with job_name completes or
         fails and return final state if finished.
         Raises AirflowException when the job failed
         :param job_name: unique job name per AWS account
         :param run_id: The job-run ID of the predecessor job run
-        :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs.  (default: False)
-        :return: Dict of JobRunState and JobRunId
+        :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs after completion of the jobs.
+        (default: False)
+        :param continuous_logging: If True (qnd verbose also True), then print Glue job output cloudwatch logs
+        continuously (once log stream available) in the Airflow Logs.(default: False)
+        :return: Dict of JobRunState and JobRunId (after completion) otherwise failure exception.
         """
+        region_name = self.conn_region_name
+        self.log.info("region name - %s", region_name)
+        credentials = self.get_credentials(region_name=region_name)
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
+        log_client = boto3.client(
+            'logs',
+            region_name=region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
+        glue_client = self.get_conn()
         next_log_token = None
+        mode = None
         job_failed = False
+        if verbose and continuous_logging:
+            mode = 'CONTINUE'
+            self.log.info("Continuous logging mode is enable.")
+        elif verbose:
+            mode = 'BATCH'
+            self.log.info("Batch logging mode is enable.")
+        else:
+            self.log.info("Logging is disable.")
 
         while True:
             try:
-                job_run_state = self.get_job_state(job_name, run_id)
-                if job_run_state in finished_states:
-                    self.log.info('Exiting Job %s Run State: %s', run_id, job_run_state)
-                    return {'JobRunState': job_run_state, 'JobRunId': run_id}
-                if job_run_state in failed_states:
-                    job_failed = True
-                    job_error_message = f'Exiting Job {run_id} Run State: {job_run_state}'
-                    self.log.info(job_error_message)
-                    raise AirflowException(job_error_message)
-                else:
-                    self.log.info(
-                        'Polling for AWS Glue Job %s current run state with status %s',
-                        job_name,
-                        job_run_state,
-                    )
-                    time.sleep(self.JOB_POLL_INTERVAL)
-            finally:
-                if verbose:
-                    next_log_token = self.print_job_logs(
+                job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
+                job_run_state = job_run['JobRun']['JobRunState']

Review Comment:
   Any reason not to keep using the existing get_job_state helper here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r980510218


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -202,34 +204,52 @@ def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> D
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
         next_log_token = None
-        job_failed = False
+        glue_client = self.get_conn()
+        cw_logs = AwsLogsHook()

Review Comment:
   Without any parameters to the hook it will use default boto3 strategy, and `cw_logs.get_conn()` turned into `boto3.client("logs")`.
   
   ```suggestion
           cw_logs = AwsLogsHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
   ```
   
   But it really nice that you move code related to AWS CloudWatch to the `AwsLogsHook`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nikhi-suthar commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r979203551


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
         log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
         log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
         log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
         try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
-            else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response['events']):
+            self.log.info('Glue Job Run Logs')
+            response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)

Review Comment:
   filter_log_events will not give logs and also in the case of a custom log, it will not be useful. get_logs_events I have tested and it is working fine.  Agree with you regarding extra filter constants, I will drop and push new changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nikhi-suthar commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r988658207


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -17,12 +17,15 @@
 # under the License.
 from __future__ import annotations
 
+from __future__ import annotations
+

Review Comment:
   Change Done



##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -17,12 +17,15 @@
 # under the License.
 from __future__ import annotations
 
+from __future__ import annotations
+

Review Comment:
   Changes Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs 
URL: https://github.com/apache/airflow/pull/26269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #26269:
URL: https://github.com/apache/airflow/pull/26269#issuecomment-1436141544

   > Hi is there any update on this PR, thanks!
   
   Not really - as you see it's closed. But if you want to pick it up where the original author left it - feel free to continue in a new PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r979213347


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   > creating a new client for logs from the same AWSBaseHook class is not possible
   
   Yes you can not create different client type by call `AWSBaseHook.get_client_type` since latest changes.
   
   https://github.com/apache/airflow/blob/3b6176929ccc0351554e5331d920c4ec65b33a38/airflow/providers/amazon/aws/hooks/base_aws.py#L438-L454
   
   But you still can create `boto.session.Session()` and can use this session for create whatever client you wanted
   
   https://github.com/apache/airflow/blob/3b6176929ccc0351554e5331d920c4ec65b33a38/airflow/providers/amazon/aws/hooks/base_aws.py#L432-L436
   
   **Generic approach for boto3 client**
   ```python
   session = boto3.session.Session(...)
   client = session.client("awesome-client", ...)
   ```
   
   **How it could be archived by `AwsGenericHook` / `AWSBaseHook` and any their child**
   ```python
   hook = GlueJobHook(aws_conn_id="awesome_aws_conn_id", ...)
   session = self.get_session(...)
   client = session.client("awesome-client", ...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nikhi-suthar commented on pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
nikhi-suthar commented on PR #26269:
URL: https://github.com/apache/airflow/pull/26269#issuecomment-1265661608

   > For some reason the future.**annotations** import is duplicated ?
   
   Could you please help me to get the name of the file, I guess a pre-hook might have added it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26269:
URL: https://github.com/apache/airflow/pull/26269#issuecomment-1250400136

   Needs conflict resolving and answering @ferruzzi 's comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nikhi-suthar commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r979202278


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   > 
   
   I tried that but the issue is, in Glue connector, we are already using one client for Glue from AWSBaseHook object, so for cloudwatch I really do not want to create one more object of AWSBaseHook t get the boto3 client that's a reason I have added it directly through boto3. Correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nikhi-suthar commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r979209679


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
         log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
         log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
         log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
         try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
-            else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response['events']):
+            self.log.info('Glue Job Run Logs')
+            response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            while len(response['events']) > 0:
+                next_token = response["nextForwardToken"]
                 messages = '\t'.join([event['message'] for event in response['events']])
-                self.log.info('Glue Job Run Logs:\n\t%s', messages)
-
+                self.log.info('\n\t%s', messages)
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            return None
         except log_client.exceptions.ResourceNotFoundException:
             self.log.warning(
                 'No new Glue driver logs found. This might be because there are no new logs, '
                 'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
                 f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
             )
+        return None
 
-        # If no new log events are available, filter_log_events will return None.
-        # In that case, check the same token again next pass.
-        return response.get('nextToken') or next_token
+    def print_output_logs(
+        self, job_name: str, run_id: str, log_group_name: str, log_client, next_token=None
+    ) -> Optional[str]:
+        """Prints Glue cloudwatch logs continuously to the Airflow task log and returns nextToken."""
+        log_group_name = log_group_name + '/' + DEFAULT_LOG_SUFFIX
+        try:
+            if next_token is None:
+                response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            else:
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            if len(response['events']) > 0:
+                for event in response['events']:
+                    self.log.info("[%s] %s", job_name, event['message'])
+                return response["nextForwardToken"]
+            else:
+                return None
+        except log_client.exceptions.ResourceNotFoundException:
+            self.log.warning("Waiting for the Glue Job output log stream %s/%s", log_group_name, run_id)
+            return None
+        except Exception as E:
+            self.log.warning(str(E))
+            return None
 
-    def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
+    def job_completion(
+        self, job_name: str, run_id: str, verbose: bool = False, continuous_logging: bool = False
+    ) -> Dict[str, str]:
         """
         Waits until Glue job with job_name completes or
         fails and return final state if finished.
         Raises AirflowException when the job failed
         :param job_name: unique job name per AWS account
         :param run_id: The job-run ID of the predecessor job run
-        :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs.  (default: False)
-        :return: Dict of JobRunState and JobRunId
+        :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs after completion of the jobs.
+        (default: False)
+        :param continuous_logging: If True (qnd verbose also True), then print Glue job output cloudwatch logs
+        continuously (once log stream available) in the Airflow Logs.(default: False)
+        :return: Dict of JobRunState and JobRunId (after completion) otherwise failure exception.
         """
+        region_name = self.conn_region_name
+        self.log.info("region name - %s", region_name)
+        credentials = self.get_credentials(region_name=region_name)
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
+        log_client = boto3.client(
+            'logs',
+            region_name=region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
+        glue_client = self.get_conn()
         next_log_token = None
+        mode = None
         job_failed = False
+        if verbose and continuous_logging:
+            mode = 'CONTINUE'
+            self.log.info("Continuous logging mode is enable.")
+        elif verbose:
+            mode = 'BATCH'
+            self.log.info("Batch logging mode is enable.")
+        else:
+            self.log.info("Logging is disable.")
 
         while True:
             try:
-                job_run_state = self.get_job_state(job_name, run_id)
-                if job_run_state in finished_states:
-                    self.log.info('Exiting Job %s Run State: %s', run_id, job_run_state)
-                    return {'JobRunState': job_run_state, 'JobRunId': run_id}
-                if job_run_state in failed_states:
-                    job_failed = True
-                    job_error_message = f'Exiting Job {run_id} Run State: {job_run_state}'
-                    self.log.info(job_error_message)
-                    raise AirflowException(job_error_message)
-                else:
-                    self.log.info(
-                        'Polling for AWS Glue Job %s current run state with status %s',
-                        job_name,
-                        job_run_state,
-                    )
-                    time.sleep(self.JOB_POLL_INTERVAL)
-            finally:
-                if verbose:
-                    next_log_token = self.print_job_logs(
+                job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
+                job_run_state = job_run['JobRun']['JobRunState']
+                msg = f'[Batch] The Job {job_name} with run id {run_id} is : {job_run_state}'
+                if mode == 'CONTINUE':
+                    log_group_name = job_run['JobRun']['LogGroupName']
+                    token = self.print_output_logs(
                         job_name=job_name,
                         run_id=run_id,
-                        job_failed=job_failed,
+                        log_group_name=log_group_name,
+                        log_client=log_client,
                         next_token=next_log_token,
                     )
+                    while token is not None:
+                        next_log_token = token
+                        token = self.print_output_logs(
+                            job_name=job_name,
+                            run_id=run_id,
+                            log_group_name=log_group_name,
+                            log_client=log_client,
+                            next_token=next_log_token,
+                        )
+                elif mode == 'BATCH':
+                    msg = f"The Job {job_name} with run id {run_id} is : {job_run_state}"

Review Comment:
   I had added other use cases, but currently it is not required. Will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r979213347


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   > creating a new client for logs from the same AWSBaseHook class is not possible
   
   Yes you can not create different client type by call `AWSBaseHook.get_client_type` since latest changes.
   
   https://github.com/apache/airflow/blob/3b6176929ccc0351554e5331d920c4ec65b33a38/airflow/providers/amazon/aws/hooks/base_aws.py#L438-L454
   
   But you still can create `boto.session.Session()` and can use this session for create whatever client you wanted
   
   https://github.com/apache/airflow/blob/3b6176929ccc0351554e5331d920c4ec65b33a38/airflow/providers/amazon/aws/hooks/base_aws.py#L432-L436
   
   **Generic approach for boto3 client**
   ```python
   session = boto3.session.Session(...)
   client = session("awesome-client", ...)
   ```
   
   **How it could be archived by `AwsGenericHook` / `AWSBaseHook` and any their child**
   ```python
   hook = GlueJobHook(aws_conn_id="awesome_aws_conn_id", ...)
   session = self.get_session(...)
   client = session("awesome-client", ...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nikhi-suthar commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r979207995


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   In the existing Glue hook, we are using boto3 directly for log_client. I guess the reason for that will be a conflict of kwargs['client_type"]. since we are already using a glue client from AWSBaseHook in the same code, creating a new client for logs from the same AWSBaseHook class is not possible. I agree with your point that it might cause issue with role-based access.. let me check how we can solve it. I guess I should declare AWSLogsHook object and get a session from that only. I will try that and push new changes soon. Meanwhile, if you have any other workaround please feel free to provide the same. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r986054074


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -17,12 +17,15 @@
 # under the License.
 from __future__ import annotations
 
+from __future__ import annotations
+

Review Comment:
   @nikhi-suthar this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on PR #26269:
URL: https://github.com/apache/airflow/pull/26269#issuecomment-1258547195

   Changes and replies look good to me.  Thanks for those, and for your patience. :+1: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nikhi-suthar commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r981206376


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -202,34 +204,52 @@ def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> D
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
         next_log_token = None
-        job_failed = False
+        glue_client = self.get_conn()
+        cw_logs = AwsLogsHook()

Review Comment:
   Thanks, @Taragolis  I have made changes as per the suggestion. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26269:
URL: https://github.com/apache/airflow/pull/26269#issuecomment-1272933821

   But tests are failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org