You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/12/19 10:43:53 UTC
[airflow] branch main updated: Fix MyPy Errors for Qubole provider. (#20319)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7d84196 Fix MyPy Errors for Qubole provider. (#20319)
7d84196 is described below
commit 7d84196519fcdbf96204d754d95c4dbca1ba9121
Author: Dmytro Kazanzhy <dk...@gmail.com>
AuthorDate: Sun Dec 19 12:43:29 2021 +0200
Fix MyPy Errors for Qubole provider. (#20319)
Co-authored-by: Dmytro Kazanzhy <dk...@demandbase.com>
---
airflow/providers/qubole/hooks/qubole.py | 4 ++--
airflow/providers/qubole/hooks/qubole_check.py | 3 ++-
airflow/providers/qubole/operators/qubole_check.py | 19 ++++++++++++++-----
3 files changed, 18 insertions(+), 8 deletions(-)
diff --git a/airflow/providers/qubole/hooks/qubole.py b/airflow/providers/qubole/hooks/qubole.py
index e418513..59fa9b4 100644
--- a/airflow/providers/qubole/hooks/qubole.py
+++ b/airflow/providers/qubole/hooks/qubole.py
@@ -22,7 +22,7 @@ import logging
import os
import pathlib
import time
-from typing import Dict, List, Tuple
+from typing import Dict, List, Optional, Tuple
from qds_sdk.commands import (
Command,
@@ -134,7 +134,7 @@ class QuboleHook(BaseHook):
self.dag_id = kwargs['dag'].dag_id
self.kwargs = kwargs
self.cls = COMMAND_CLASSES[self.kwargs['command_type']]
- self.cmd = None
+ self.cmd: Optional[Command] = None
self.task_instance = None
@staticmethod
diff --git a/airflow/providers/qubole/hooks/qubole_check.py b/airflow/providers/qubole/hooks/qubole_check.py
index 9cafa39..5dba31c 100644
--- a/airflow/providers/qubole/hooks/qubole_check.py
+++ b/airflow/providers/qubole/hooks/qubole_check.py
@@ -23,6 +23,7 @@ from typing import List, Optional, Union
from qds_sdk.commands import Command
from airflow.exceptions import AirflowException
+from airflow.hooks.dbapi import DbApiHook
from airflow.providers.qubole.hooks.qubole import QuboleHook
log = logging.getLogger(__name__)
@@ -74,7 +75,7 @@ def parse_first_row(row_list) -> List[Union[bool, float, int, str]]:
return record_list
-class QuboleCheckHook(QuboleHook):
+class QuboleCheckHook(QuboleHook, DbApiHook):
"""Qubole check hook"""
def __init__(self, context, *args, **kwargs) -> None:
diff --git a/airflow/providers/qubole/operators/qubole_check.py b/airflow/providers/qubole/operators/qubole_check.py
index 39021df..bd5d424 100644
--- a/airflow/providers/qubole/operators/qubole_check.py
+++ b/airflow/providers/qubole/operators/qubole_check.py
@@ -27,11 +27,14 @@ from airflow.providers.qubole.operators.qubole import QuboleOperator
class _QuboleCheckOperatorMixin:
"""This is a Mixin for Qubole related check operators"""
+ kwargs: dict
+ results_parser_callable: Optional[Callable]
+
def execute(self, context=None) -> None:
"""Execute a check operation against Qubole"""
try:
self._hook_context = context
- super().execute(context=context)
+ super().execute(context=context) # type: ignore[misc]
except AirflowException as e:
handle_airflow_exception(e, self.get_hook())
@@ -39,9 +42,11 @@ class _QuboleCheckOperatorMixin:
"""Get QuboleCheckHook"""
return self.get_hook()
- # this overwrite the original QuboleOperator.get_hook() which returns a QuboleHook.
def get_hook(self) -> QuboleCheckHook:
- """Reinitialising the hook, as some template fields might have changed"""
+ """
+ Reinitialising the hook, as some template fields might have changed
+ This method overwrites the original QuboleOperator.get_hook() which returns a QuboleHook.
+ """
return QuboleCheckHook(
context=self._hook_context, results_parser_callable=self.results_parser_callable, **self.kwargs
)
@@ -107,7 +112,11 @@ class QuboleCheckOperator(_QuboleCheckOperatorMixin, SQLCheckOperator, QuboleOpe
ui_fgcolor = '#000'
def __init__(
- self, *, qubole_conn_id: str = "qubole_default", results_parser_callable: Callable = None, **kwargs
+ self,
+ *,
+ qubole_conn_id: str = "qubole_default",
+ results_parser_callable: Optional[Callable] = None,
+ **kwargs,
) -> None:
sql = get_sql_from_qbol_cmd(kwargs)
kwargs.pop('sql', None)
@@ -168,7 +177,7 @@ class QuboleValueCheckOperator(_QuboleCheckOperatorMixin, SQLValueCheckOperator,
*,
pass_value: Union[str, int, float],
tolerance: Optional[Union[int, float]] = None,
- results_parser_callable: Callable = None,
+ results_parser_callable: Optional[Callable] = None,
qubole_conn_id: str = "qubole_default",
**kwargs,
) -> None: