You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/25 12:47:45 UTC
[GitHub] [airflow] Lazloo commented on issue #25286: Error description cannot be shown
Lazloo commented on issue #25286:
URL: https://github.com/apache/airflow/issues/25286#issuecomment-1194004837
Here the code that run the deployment:
```
from airflow import AirflowException
from airflow.models import Variable
from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
XCOM_RUN_ID_KEY,
XCOM_RUN_PAGE_URL_KEY,
_handle_databricks_operator_execution
)
class DatabricksRunNowAttachOperator(DatabricksRunNowOperator):
"""
Custom operator extending the existing DatabricksRunNowOperator.
The difference is, that it stores the the run_id in a airflow variable and
uses this to attach to the run if the operator is restarted
This pattern avoids newly started jobs in case the operator is restarted, e.g. because of a scheduler restart.
"""
def execute(self, context):
variable_key = f"{context['task_instance_key_str']}_{XCOM_RUN_ID_KEY}"
existing_job_run = Variable.get(key=variable_key, default_var=None)
hook = self._get_hook()
if existing_job_run is None:
self.log.info("No job run found for this task, starting new run in Databricks.")
self.run_id = hook.run_now(self.json)
self.log.debug(f"Job started. Writing run id to job variable {variable_key}")
Variable.set(key=variable_key, value=self.run_id)
else:
self.log.info(f"Found existing job run {existing_job_run} for this task. Attaching to this run.")
self.run_id = existing_job_run
try:
_handle_databricks_operator_execution(self, hook, self.log, context)
except AirflowException as e:
if "failed with terminal state" in str(e):
self.log.debug(f"Databricks job failed. Cleaning job variable {variable_key}")
Variable.delete(key=variable_key)
raise e
self.log.debug(f"Databricks job terminated. Cleaning job variable {variable_key}")
Variable.delete(key=variable_key)
DatabricksRunNowAttachOperator(
task_id=job_config.name,
job_id=job_config.id,
polling_period_seconds=job_config.polling_period_seconds,
retries=job_config.retries
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org