You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/25 12:47:45 UTC

[GitHub] [airflow] Lazloo commented on issue #25286: Error description cannot be shown

Lazloo commented on issue #25286:
URL: https://github.com/apache/airflow/issues/25286#issuecomment-1194004837

   Here the code that run the deployment:
   
   ```
   from airflow import AirflowException
   from airflow.models import Variable
   from airflow.providers.databricks.operators.databricks import (
       DatabricksRunNowOperator,
       XCOM_RUN_ID_KEY,
       XCOM_RUN_PAGE_URL_KEY,
       _handle_databricks_operator_execution
   )
   
   class DatabricksRunNowAttachOperator(DatabricksRunNowOperator):
       """
       Custom operator extending the existing DatabricksRunNowOperator.
   
       The difference is, that it stores the the run_id in a airflow variable and
       uses this to attach to the run if the operator is restarted
   
       This pattern avoids newly started jobs in case the operator is restarted, e.g. because of a scheduler restart.
       """
   
       def execute(self, context):
           variable_key = f"{context['task_instance_key_str']}_{XCOM_RUN_ID_KEY}"
           existing_job_run = Variable.get(key=variable_key, default_var=None)
           hook = self._get_hook()
           if existing_job_run is None:
               self.log.info("No job run found for this task, starting new run in Databricks.")
               self.run_id = hook.run_now(self.json)
               self.log.debug(f"Job started. Writing run id to job variable {variable_key}")
               Variable.set(key=variable_key, value=self.run_id)
           else:
               self.log.info(f"Found existing job run {existing_job_run} for this task. Attaching to this run.")
               self.run_id = existing_job_run
   
           try:
               _handle_databricks_operator_execution(self, hook, self.log, context)
           except AirflowException as e:
               if "failed with terminal state" in str(e):
                   self.log.debug(f"Databricks job failed. Cleaning job variable {variable_key}")
                   Variable.delete(key=variable_key)
               raise e
           self.log.debug(f"Databricks job terminated. Cleaning job variable {variable_key}")
           Variable.delete(key=variable_key)
   
   DatabricksRunNowAttachOperator(
                   task_id=job_config.name,
                   job_id=job_config.id,
                   polling_period_seconds=job_config.polling_period_seconds,
                   retries=job_config.retries
               )
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org