You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/13 11:17:53 UTC

[GitHub] [airflow] Taragolis commented on a diff in pull request #26269: Added Feature - Glue job continuously log printing in Airflow task logs

Taragolis commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r969491629


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   BTW, in your case you do not provide `aws_session_token` into the client. If user follow aws best practices and use temporary credentials obtain by role_arn, EC2 Instance IAM role, ECS Task IAM Role, IRSA EKS and etc. then token should be provided otherwise error will raised.
   
   I thought token not required only for permanent credentials



##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
         log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
         log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
         log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
         try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
-            else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response['events']):
+            self.log.info('Glue Job Run Logs')
+            response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            while len(response['events']) > 0:
+                next_token = response["nextForwardToken"]
                 messages = '\t'.join([event['message'] for event in response['events']])
-                self.log.info('Glue Job Run Logs:\n\t%s', messages)
-
+                self.log.info('\n\t%s', messages)
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            return None
         except log_client.exceptions.ResourceNotFoundException:
             self.log.warning(
                 'No new Glue driver logs found. This might be because there are no new logs, '
                 'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
                 f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
             )
+        return None
 
-        # If no new log events are available, filter_log_events will return None.
-        # In that case, check the same token again next pass.
-        return response.get('nextToken') or next_token
+    def print_output_logs(
+        self, job_name: str, run_id: str, log_group_name: str, log_client, next_token=None
+    ) -> Optional[str]:
+        """Prints Glue cloudwatch logs continuously to the Airflow task log and returns nextToken."""
+        log_group_name = log_group_name + '/' + DEFAULT_LOG_SUFFIX
+        try:
+            if next_token is None:
+                response = log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            else:
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, nextToken=next_token
+                )
+            if len(response['events']) > 0:
+                for event in response['events']:
+                    self.log.info("[%s] %s", job_name, event['message'])
+                return response["nextForwardToken"]
+            else:
+                return None
+        except log_client.exceptions.ResourceNotFoundException:
+            self.log.warning("Waiting for the Glue Job output log stream %s/%s", log_group_name, run_id)
+            return None
+        except Exception as E:
+            self.log.warning(str(E))
+            return None
 
-    def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
+    def job_completion(
+        self, job_name: str, run_id: str, verbose: bool = False, continuous_logging: bool = False
+    ) -> Dict[str, str]:
         """
         Waits until Glue job with job_name completes or
         fails and return final state if finished.
         Raises AirflowException when the job failed
         :param job_name: unique job name per AWS account
         :param run_id: The job-run ID of the predecessor job run
-        :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs.  (default: False)
-        :return: Dict of JobRunState and JobRunId
+        :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs after completion of the jobs.
+        (default: False)
+        :param continuous_logging: If True (qnd verbose also True), then print Glue job output cloudwatch logs
+        continuously (once log stream available) in the Airflow Logs.(default: False)
+        :return: Dict of JobRunState and JobRunId (after completion) otherwise failure exception.
         """
+        region_name = self.conn_region_name
+        self.log.info("region name - %s", region_name)
+        credentials = self.get_credentials(region_name=region_name)
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
+        log_client = boto3.client(
+            'logs',
+            region_name=region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   Same as above



##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )

Review Comment:
   Might be rather than directly use `boto3` use AwsBaseHook.get_session method instead, so it could be replaced by
   
   ```python
   log_client = self.get_session(region_name=self.conn_region_name).client('logs')
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org