You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/12/12 14:45:09 UTC

[airflow] branch main updated: Fix MyPy errors in google.cloud.sensors (#20228)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f66257  Fix MyPy errors in google.cloud.sensors (#20228)
1f66257 is described below

commit 1f662571b2133df09da22aea35936bb10b8ebffa
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sun Dec 12 15:44:27 2021 +0100

    Fix MyPy errors in google.cloud.sensors (#20228)
    
    Part of #19891
---
 airflow/providers/google/cloud/hooks/datafusion.py     | 2 +-
 airflow/providers/google/cloud/sensors/bigquery_dts.py | 4 +++-
 airflow/providers/google/cloud/sensors/datafusion.py   | 2 +-
 airflow/providers/google/cloud/sensors/dataproc.py     | 4 ++--
 airflow/providers/google/cloud/sensors/workflows.py    | 7 +++++--
 5 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/datafusion.py b/airflow/providers/google/cloud/hooks/datafusion.py
index b6321dc..3f2828a 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -407,7 +407,7 @@ class DataFusionHook(GoogleBaseHook):
         instance_url: str,
         pipeline_id: str,
         namespace: str = "default",
-    ) -> str:
+    ) -> Any:
         url = os.path.join(
             self._base_url(instance_url, namespace),
             quote(pipeline_name),
diff --git a/airflow/providers/google/cloud/sensors/bigquery_dts.py b/airflow/providers/google/cloud/sensors/bigquery_dts.py
index 4b92cb4..8a782a4 100644
--- a/airflow/providers/google/cloud/sensors/bigquery_dts.py
+++ b/airflow/providers/google/cloud/sensors/bigquery_dts.py
@@ -106,7 +106,9 @@ class BigQueryDataTransferServiceTransferRunSensor(BaseSensorOperator):
         result = set()
         for state in states:
             if isinstance(state, str):
-                result.add(TransferState[state.upper()])
+                # The proto.Enum type is indexable (via MetaClass and aliased) but MyPy is not able to
+                # infer this https://github.com/python/mypy/issues/8968
+                result.add(TransferState[state.upper()])  # type: ignore[misc]
             elif isinstance(state, int):
                 result.add(TransferState(state))
             elif isinstance(state, TransferState):
diff --git a/airflow/providers/google/cloud/sensors/datafusion.py b/airflow/providers/google/cloud/sensors/datafusion.py
index 54c7e4e..6b3e6e1 100644
--- a/airflow/providers/google/cloud/sensors/datafusion.py
+++ b/airflow/providers/google/cloud/sensors/datafusion.py
@@ -72,7 +72,7 @@ class CloudDataFusionPipelineStateSensor(BaseSensorOperator):
         expected_statuses: Set[str],
         instance_name: str,
         location: str,
-        failure_statuses: Set[str] = None,
+        failure_statuses: Optional[Set[str]] = None,
         project_id: Optional[str] = None,
         namespace: str = "default",
         gcp_conn_id: str = 'google_cloud_default',
diff --git a/airflow/providers/google/cloud/sensors/dataproc.py b/airflow/providers/google/cloud/sensors/dataproc.py
index 5f2b235..0fab0e4 100644
--- a/airflow/providers/google/cloud/sensors/dataproc.py
+++ b/airflow/providers/google/cloud/sensors/dataproc.py
@@ -56,7 +56,7 @@ class DataprocJobSensor(BaseSensorOperator):
         *,
         project_id: str,
         dataproc_job_id: str,
-        region: str = None,
+        region: Optional[str] = None,
         location: Optional[str] = None,
         gcp_conn_id: str = 'google_cloud_default',
         wait_timeout: Optional[int] = None,
@@ -79,7 +79,7 @@ class DataprocJobSensor(BaseSensorOperator):
         self.dataproc_job_id = dataproc_job_id
         self.region = region
         self.wait_timeout = wait_timeout
-        self.start_sensor_time = None
+        self.start_sensor_time: Optional[float] = None
 
     def execute(self, context: Dict):
         self.start_sensor_time = time.monotonic()
diff --git a/airflow/providers/google/cloud/sensors/workflows.py b/airflow/providers/google/cloud/sensors/workflows.py
index 5950458..a1162a1 100644
--- a/airflow/providers/google/cloud/sensors/workflows.py
+++ b/airflow/providers/google/cloud/sensors/workflows.py
@@ -73,8 +73,11 @@ class WorkflowExecutionSensor(BaseSensorOperator):
     ):
         super().__init__(**kwargs)
 
-        self.success_states = success_states or {Execution.State.SUCCEEDED}
-        self.failure_states = failure_states or {Execution.State.FAILED, Execution.State.CANCELLED}
+        self.success_states = success_states or {Execution.State(Execution.State.SUCCEEDED)}
+        self.failure_states = failure_states or {
+            Execution.State(Execution.State.FAILED),
+            Execution.State(Execution.State.CANCELLED),
+        }
         self.workflow_id = workflow_id
         self.execution_id = execution_id
         self.location = location