You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "dimberman (via GitHub)" <gi...@apache.org> on 2023/02/23 20:07:52 UTC

[GitHub] [airflow] dimberman commented on a diff in pull request #29522: Add support in AWS Batch Operator for multinode jobs

dimberman commented on code in PR #29522:
URL: https://github.com/apache/airflow/pull/29522#discussion_r1116193774


##########
airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -174,18 +209,27 @@ def submit_job(self, context: Context):
             self.job_definition,
             self.job_queue,
         )
-        self.log.info("AWS Batch job - container overrides: %s", self.overrides)
+
+        if self.container_overrides:
+            self.log.info("AWS Batch job - container overrides: %s", self.container_overrides)
+        if self.array_properties:
+            self.log.info("AWS Batch job - array properties: %s", self.array_properties)
+        if self.node_overrides:
+            self.log.info("AWS Batch job - node properties: %s", self.node_overrides)

Review Comment:
   should this always be printed? Are there any security or verbosity concerns here?



##########
airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -133,17 +143,42 @@ def __init__(
         self.job_name = job_name
         self.job_definition = job_definition
         self.job_queue = job_queue
-        self.overrides = overrides or {}
-        self.array_properties = array_properties or {}
+
+        self.container_overrides = None
+        if overrides:
+            self.container_overrides = overrides
+            warnings.warn(
+                "Parameter `overrides` is deprecated, Please use `container_overrides` instead.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            if container_overrides:
+                raise AirflowException(
+                    "If providing `container_overrides`, then old parameter 'overrides' should be removed."
+                )

Review Comment:
   Can you add a comment explaining what's happening here?



##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -419,8 +419,46 @@ def get_job_awslogs_info(self, job_id: str) -> dict[str, str] | None:
 
         :param job_id: AWS Batch Job ID
         """
-        job_container_desc = self.get_job_description(job_id=job_id).get("container", {})
-        log_configuration = job_container_desc.get("logConfiguration", {})
+        job_desc = self.get_job_description(job_id=job_id)
+
+        job_node_properties = job_desc.get("nodeProperties", {})
+        job_container_desc = job_desc.get("container", {})
+
+        if job_node_properties:
+            job_node_range_properties = job_node_properties.get("nodeRangeProperties", {})
+            if len(job_node_range_properties) > 1:
+                self.log.warning(
+                    "AWS Batch job (%s) has more than one node group. Only returning logs from first group.",
+                    job_id,
+                )
+            if not job_node_range_properties:
+                raise AirflowException(
+                    "AWS Batch job (%s) has no node group. It was described as such:\n%s", job_id, job_desc
+                )
+
+            log_configuration = job_node_range_properties[0].get("container", {}).get("logConfiguration", {})
+            # "logStreamName" value is not available in the "container" object for multinode jobs --
+            # it is available in the "attempts" object
+            job_attempts = job_desc.get("attempts", [])
+            if len(job_attempts):
+                if len(job_attempts) > 1:
+                    self.log.warning(
+                        "AWS Batch job (%s) has had more than one attempt. "
+                        "Only returning logs from the most recent attempt.",
+                        job_id,
+                    )
+                awslogs_stream_name = job_attempts[-1].get("container", {}).get("logStreamName")
+            else:
+                awslogs_stream_name = None

Review Comment:
   There is a lot of nested code here. Can you break this out into functions?



##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -419,8 +419,46 @@ def get_job_awslogs_info(self, job_id: str) -> dict[str, str] | None:
 
         :param job_id: AWS Batch Job ID
         """
-        job_container_desc = self.get_job_description(job_id=job_id).get("container", {})
-        log_configuration = job_container_desc.get("logConfiguration", {})
+        job_desc = self.get_job_description(job_id=job_id)

Review Comment:
   @vandonr-amz can you please add this context to the PR description ^^^ It would be great to have future users able to immediately understand what's going on here.



##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -419,8 +419,46 @@ def get_job_awslogs_info(self, job_id: str) -> dict[str, str] | None:
 
         :param job_id: AWS Batch Job ID
         """
-        job_container_desc = self.get_job_description(job_id=job_id).get("container", {})
-        log_configuration = job_container_desc.get("logConfiguration", {})
+        job_desc = self.get_job_description(job_id=job_id)
+
+        job_node_properties = job_desc.get("nodeProperties", {})
+        job_container_desc = job_desc.get("container", {})
+
+        if job_node_properties:
+            job_node_range_properties = job_node_properties.get("nodeRangeProperties", {})
+            if len(job_node_range_properties) > 1:
+                self.log.warning(
+                    "AWS Batch job (%s) has more than one node group. Only returning logs from first group.",
+                    job_id,
+                )
+            if not job_node_range_properties:
+                raise AirflowException(
+                    "AWS Batch job (%s) has no node group. It was described as such:\n%s", job_id, job_desc
+                )
+
+            log_configuration = job_node_range_properties[0].get("container", {}).get("logConfiguration", {})
+            # "logStreamName" value is not available in the "container" object for multinode jobs --
+            # it is available in the "attempts" object
+            job_attempts = job_desc.get("attempts", [])
+            if len(job_attempts):
+                if len(job_attempts) > 1:
+                    self.log.warning(
+                        "AWS Batch job (%s) has had more than one attempt. "
+                        "Only returning logs from the most recent attempt.",
+                        job_id,
+                    )
+                awslogs_stream_name = job_attempts[-1].get("container", {}).get("logStreamName")

Review Comment:
   You could save a layer of nesting by doing something like:
   
   ```
   if not job_attempts:
       awslogs_stream_name = None
   else:
       awslogs_stream_name = job_attempts[-1].get("container", {}).get("logStreamName")
   if len(job_attempts) > 1:
      # write warning
   ```
   
   You might have one line of repeated code 
       
       



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org