You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ta...@apache.org on 2023/09/21 22:36:06 UTC

[airflow] branch main updated: Update `BatchOperator` operator_extra_links property (#34506)

This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 04515ef008 Update `BatchOperator` operator_extra_links property (#34506)
04515ef008 is described below

commit 04515ef008852a8dd05cdca53f96a9d4fda034c1
Author: Asher <as...@gmail.com>
AuthorDate: Thu Sep 21 18:35:59 2023 -0400

    Update `BatchOperator` operator_extra_links property (#34506)
---
 airflow/providers/amazon/aws/operators/batch.py | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/batch.py b/airflow/providers/amazon/aws/operators/batch.py
index a13a55d15d..23d245ae7b 100644
--- a/airflow/providers/amazon/aws/operators/batch.py
+++ b/airflow/providers/amazon/aws/operators/batch.py
@@ -32,6 +32,7 @@ from typing import TYPE_CHECKING, Any, Sequence
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
+from airflow.models.mappedoperator import MappedOperator
 from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
 from airflow.providers.amazon.aws.links.batch import (
     BatchJobDefinitionLink,
@@ -126,9 +127,21 @@ class BatchOperator(BaseOperator):
     @property
     def operator_extra_links(self):
         op_extra_links = [BatchJobDetailsLink()]
-        if self.wait_for_completion:
+
+        if isinstance(self, MappedOperator):
+            wait_for_completion = self.partial_kwargs.get(
+                "wait_for_completion"
+            ) or self.expand_input.value.get("wait_for_completion")
+            array_properties = self.partial_kwargs.get("array_properties") or self.expand_input.value.get(
+                "array_properties"
+            )
+        else:
+            wait_for_completion = self.wait_for_completion
+            array_properties = self.array_properties
+
+        if wait_for_completion:
             op_extra_links.extend([BatchJobDefinitionLink(), BatchJobQueueLink()])
-        if not self.array_properties:
+        if not array_properties:
             # There is no CloudWatch Link to the parent Batch Job available.
             op_extra_links.append(CloudWatchEventsLink())