You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ta...@apache.org on 2023/09/21 22:36:06 UTC
[airflow] branch main updated: Update `BatchOperator` operator_extra_links property (#34506)
This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 04515ef008 Update `BatchOperator` operator_extra_links property (#34506)
04515ef008 is described below
commit 04515ef008852a8dd05cdca53f96a9d4fda034c1
Author: Asher <as...@gmail.com>
AuthorDate: Thu Sep 21 18:35:59 2023 -0400
Update `BatchOperator` operator_extra_links property (#34506)
---
airflow/providers/amazon/aws/operators/batch.py | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/amazon/aws/operators/batch.py b/airflow/providers/amazon/aws/operators/batch.py
index a13a55d15d..23d245ae7b 100644
--- a/airflow/providers/amazon/aws/operators/batch.py
+++ b/airflow/providers/amazon/aws/operators/batch.py
@@ -32,6 +32,7 @@ from typing import TYPE_CHECKING, Any, Sequence
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
+from airflow.models.mappedoperator import MappedOperator
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
from airflow.providers.amazon.aws.links.batch import (
BatchJobDefinitionLink,
@@ -126,9 +127,21 @@ class BatchOperator(BaseOperator):
@property
def operator_extra_links(self):
op_extra_links = [BatchJobDetailsLink()]
- if self.wait_for_completion:
+
+ if isinstance(self, MappedOperator):
+ wait_for_completion = self.partial_kwargs.get(
+ "wait_for_completion"
+ ) or self.expand_input.value.get("wait_for_completion")
+ array_properties = self.partial_kwargs.get("array_properties") or self.expand_input.value.get(
+ "array_properties"
+ )
+ else:
+ wait_for_completion = self.wait_for_completion
+ array_properties = self.array_properties
+
+ if wait_for_completion:
op_extra_links.extend([BatchJobDefinitionLink(), BatchJobQueueLink()])
- if not self.array_properties:
+ if not array_properties:
# There is no CloudWatch Link to the parent Batch Job available.
op_extra_links.append(CloudWatchEventsLink())