You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/29 07:33:23 UTC

[GitHub] [airflow] mniehoff opened a new issue #15588: Operator is restarted after Scheduler restart

mniehoff opened a new issue #15588:
URL: https://github.com/apache/airflow/issues/15588


   **Apache Airflow version**: 2.0.0
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   **Environment**: 
   
   - **Cloud provider or hardware configuration**: Hosted version of astronomer cloud
   - **OS** (e.g. from /etc/os-release): 
   - **Kernel** (e.g. `uname -a`):
   - **Install tools**:
   - **Others**:
   
   **What happened**:
   
   - Using the DatabricksRunNowOperator to start a job in Databricks
   - Scheduler get's restarted (e.g. by deploying to astronomer or because the scheduler lost the connection to the database)
   - The Databricks task gets marked as failed, directly after the scheduler comes up. No exception visible, both in the task log and in the scheduler log. Just `"ERROR - Marking run <DagRun ...., externally triggered: False> failed"`
   - The Databricks task gets started again: "Starting attempt 2 of 1"
   - Operator tries to start a new Databricks job. This fails as concurrent_runs is set to 1 on Databricks side. The second task try is also marked as failed on airflow.
   - The databricks job is still running, but no airflow tasking is polling for the status
   
   **What you expected to happen**:
   
   The existing tasks keep on running after the scheduler has been restarted and continues to poll for the databricks job status.
   
   **How to reproduce it**:
   
   See "What happened" above. A simple DAG with a DatabricksOperator and then restarting the scheduler works for us to reproduce the issue.
   
   **Anything else we need to know**:
   
   The problem occurs deterministically when the scheduler is being restarted and a databricks operator is running. 
   
   Last output of the first task:
   
   ```
   [2021-04-29 06:44:13,509] {databricks.py:90} INFO - import_raw_and_process in run state: {'life_cycle_state': 'RUNNING', 'result_state': None, 'state_message': 'In run'}
   [2021-04-29 06:44:13,510] {databricks.py:91} INFO - View run status, Spark UI, and logs at https://xxxxxxxxxx.cloud.databricks.com#job/xx/run/xxxx
   [2021-04-29 06:44:13,510] {databricks.py:92} INFO - Sleeping for 60 seconds.
   ```
   
   Output of the second task:
   
   <details><summary>log of the second task</summary> 
   ```
   [2021-04-29 06:44:37,071] {taskinstance.py:852} INFO - Dependencies all met for <TaskInstance:task_id 2021-04-28T04:05:00+00:00 [queued]>
   [2021-04-29 06:44:37,094] {taskinstance.py:852} INFO - Dependencies all met for <TaskInstance:task_id 2021-04-28T04:05:00+00:00 [queued]>
   [2021-04-29 06:44:37,094] {taskinstance.py:1043} INFO - 
   --------------------------------------------------------------------------------
   [2021-04-29 06:44:37,094] {taskinstance.py:1044} INFO - Starting attempt 2 of 1
   [2021-04-29 06:44:37,094] {taskinstance.py:1045} INFO - 
   --------------------------------------------------------------------------------
   [2021-04-29 06:44:37,163] {taskinstance.py:1064} INFO - Executing <Task(DatabricksRunNowOperator): import_raw_and_process> on 2021-04-28T04:05:00+00:00
   [2021-04-29 06:44:37,168] {standard_task_runner.py:52} INFO - Started process 214 to run task
   [2021-04-29 06:44:37,173] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'xxx.prod.integration', 'import_raw_and_process', '2021-04-28T04:05:00+00:00', '--job-id', '21291', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/xxx/prod/integration.py', '--cfg-path', '/tmp/tmpyneez1sp', '--error-file', '/tmp/tmp6cuo7gkp']
   [2021-04-29 06:44:37,175] {standard_task_runner.py:77} INFO - Job 21291: Subtask import_raw_and_process
   [2021-04-29 06:44:37,272] {logging_mixin.py:103} INFO - Running <TaskInstance:task_id 2021-04-28T04:05:00+00:00 [running]> on host spherical-antenna-8596-scheduler-55f666944b-xfwqg
   [2021-04-29 06:44:37,392] {taskinstance.py:1258} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_EMAIL=xxxx
   AIRFLOW_CTX_DAG_OWNER=datascience
   AIRFLOW_CTX_DAG_ID=xxx.prod.integration
   AIRFLOW_CTX_TASK_ID=import_raw_and_process
   AIRFLOW_CTX_EXECUTION_DATE=2021-04-28T04:05:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-04-28T04:05:00+00:00
   [2021-04-29 06:44:37,395] {base.py:74} INFO - Using connection to: id: databricks_default. Host: https://xxxx.cloud.databricks.com, Port: None, Schema: , Login: token, Password: None, extra: XXXXXXXX
   [2021-04-29 06:44:37,396] {databricks.py:170} INFO - Using token auth. 
   [2021-04-29 06:44:37,964] {databricks.py:73} INFO - Run submitted with run_id: 106466
   [2021-04-29 06:44:37,965] {databricks.py:170} INFO - Using token auth. 
   [2021-04-29 06:44:38,286] {databricks.py:78} INFO - View run status, Spark UI, and logs at https://xxxx.cloud.databricks.com#job/xx/run/xxxx
   [2021-04-29 06:44:38,287] {databricks.py:170} INFO - Using token auth. 
   [2021-04-29 06:44:38,690] {taskinstance.py:1457} ERROR - Task failed with exception
   [2021-04-29 06:44:38,693] {taskinstance.py:1507} INFO - Marking task as FAILED. dag_id=xxx.prod.integration, task_id=import_raw_and_process, execution_date=20210428T040500, start_date=20210429T064437, end_date=20210429T064438
   [2021-04-29 06:44:38,711] {email.py:184} INFO - Email alerting: attempt 1
   [2021-04-29 06:44:39,054] {email.py:196} INFO - Sent an alert email to ['xxxx']
   [2021-04-29 06:44:41,250] {local_task_job.py:142} INFO - Task exited with return code 1
   ```
   </details>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed issue #15588: Task is retried after Scheduler restart

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed issue #15588:
URL: https://github.com/apache/airflow/issues/15588


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mniehoff commented on issue #15588: Task is retried after Scheduler restart

Posted by GitBox <gi...@apache.org>.
mniehoff commented on issue #15588:
URL: https://github.com/apache/airflow/issues/15588#issuecomment-842281159


   I dug a bit deeper into it and the problem seems to be within the databricks operator (tbh: was expecting that is was rather the op than the scheduler).
   
   In comparison to e.g. BigQuery Databricks job runs do not have an id that you can set from the outside. it's getting generated as soon as a new job run is triggered. 
   The BigQueryJobOperator creates the job run id by itself and if the operator is restarted, due to a scheduler restart it checks if a job with this id is already running and if yes, "attaches" to the running job.
   
   For Databricks this is not possible, as the job id is not configurable.
   A few options I see to mitigate this:
   
   1) save the job run ID somewhere (not sure where), where it survives the scheduler restart and can be picked up by the operator. So the op could reattach to the job run. the run id would be deleted when the job has finished
   
   2) one could always reattach if there is a job run for a given job. this would work in my case, but would not work in general, as databricks allows concurrent runs for a job. and there will definitely be a case where a job run exists and a new run should be triggered.
   
   3) currently the operator polls. one could give the operator an async=True flag, which exits the operator after the databricks job has been started and then use a sensor to poll for the job run status (the job run is available using xcom already).
   
   Imho only 1 and 3 are feasible solutions. but I am not sure where to store the job run id, so that it survives the scheduler restart. 3) is imho the cleanest solution.
   
   Let me know what you think. I definitely aim to contribute these changes back to provider packages.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mniehoff commented on issue #15588: Task is retried after Scheduler restart

Posted by GitBox <gi...@apache.org>.
mniehoff commented on issue #15588:
URL: https://github.com/apache/airflow/issues/15588#issuecomment-842281159


   I dug a bit deeper into it and the problem seems to be within the databricks operator (tbh: was expecting that is was rather the op than the scheduler).
   
   In comparison to e.g. BigQuery Databricks job runs do not have an id that you can set from the outside. it's getting generated as soon as a new job run is triggered. 
   The BigQueryJobOperator creates the job run id by itself and if the operator is restarted, due to a scheduler restart it checks if a job with this id is already running and if yes, "attaches" to the running job.
   
   For Databricks this is not possible, as the job id is not configurable.
   A few options I see to mitigate this:
   
   1) save the job run ID somewhere (not sure where), where it survives the scheduler restart and can be picked up by the operator. So the op could reattach to the job run. the run id would be deleted when the job has finished
   
   2) one could always reattach if there is a job run for a given job. this would work in my case, but would not work in general, as databricks allows concurrent runs for a job. and there will definitely be a case where a job run exists and a new run should be triggered.
   
   3) currently the operator polls. one could give the operator an async=True flag, which exits the operator after the databricks job has been started and then use a sensor to poll for the job run status (the job run is available using xcom already).
   
   Imho only 1 and 3 are feasible solutions. but I am not sure where to store the job run id, so that it survives the scheduler restart. 3) is imho the cleanest solution.
   
   Let me know what you think. I definitely aim to contribute these changes back to provider packages.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on issue #15588: Task is retried after Scheduler restart

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #15588:
URL: https://github.com/apache/airflow/issues/15588#issuecomment-933055382


   This issue has been closed because it has not received response from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #15588: Operator is restarted after Scheduler restart

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #15588:
URL: https://github.com/apache/airflow/issues/15588#issuecomment-829006743


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on issue #15588: Task is retried after Scheduler restart

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #15588:
URL: https://github.com/apache/airflow/issues/15588#issuecomment-907108860


   > Imho only 1 and 3 are feasible solutions. but I am not sure where to store the job run id, so that it survives the scheduler restart. 3) is imho the cleanest solution.
   
   The scheduler shouldn't care about it. This is something that needs to be handled by the operator itself.
   I'm not sure I follow on the problem because `DatabricksRunNowOperator` has `job_id` parameter is something that you configure:
   
   https://github.com/apache/airflow/blob/866a601b76e219b3c043e1dbbc8fb22300866351/airflow/providers/databricks/operators/databricks.py#L462
   
   in any case if you think there is a problem to fix - Just open a PR with the approach you find best to solve the problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on issue #15588: Task is retried after Scheduler restart

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #15588:
URL: https://github.com/apache/airflow/issues/15588#issuecomment-927399211


   This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org