You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/11 12:29:17 UTC
[GitHub] [airflow] kotaaaa commented on issue #22907: Add SKIP process in addition to occurring ERROR in SQLCheckOperator
kotaaaa commented on issue #22907:
URL: https://github.com/apache/airflow/issues/22907#issuecomment-1094989998
@eladkal, thank you for your reply!
For my case, process is folked by the result of `select count(*) from {bigquery's table}`
so, I created class like below.
In not only my case, but also general usecase, it might happen that the result of the query to bigquery's table will decide whether it should skip the process or proceed, I thought.
(below is actually Airflow v1)
```
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowSkipException
from airflow.models import BaseOperator
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.utils.decorators import apply_defaults
class SQLCheckSkipOperator(BaseOperator):
...
def execute(self, context=None):
self.log.info("Executing SQL check: %s", self.sql)
records = self.get_db_hook().get_first(self.sql)
self.log.info("Record: %s", records)
if not records:
raise AirflowSkipException("The query returned None") # skip the upstream tasks
elif not all([bool(r) for r in records]):
raise AirflowSkipException(
"Test failed.\nQuery:\n{query}\nResults:\n{records!s}".format(
query=self.sql, records=records
)
)
self.log.info("Success.")
...
class BigQueryCheckSkipOperator(SQLCheckSkipOperator):
...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org