You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "kyle-winkelman (via GitHub)" <gi...@apache.org> on 2023/02/23 21:01:28 UTC

[GitHub] [airflow] kyle-winkelman opened a new issue, #29733: Databricks create/reset then run-now

kyle-winkelman opened a new issue, #29733:
URL: https://github.com/apache/airflow/issues/29733

   ### Description
   
   Allow an Airflow DAG to define a Databricks job with the /2.1/jobs/create (or /2.1/jobs/reset) endpoint then run that same job with the /2.1/jobs/run-now endpoint. This would give similar capabilities as the DatabricksSubmitRun operator, but the /2.1/jobs/create endpoint supports additional parameters that the /2.1/jobs/runs/submit doesn't (e.g. job_clusters, email notifications, etc.).
   
   ### Use case/motivation
   
   Create and run a Databricks job all in the Airflow DAG. Currently, DatabricksSubmitRun operator uses the /2.1/jobs/runs/submit endpoint which doesn't support all features and creates runs that aren't tied to a job in the Databricks UI. Also, DatabricksRunNow operator requires you to define the job either directly in the Databricks UI or through a separate CI/CD pipeline causing the headache of having to change code in multiple places.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kyle-winkelman commented on issue #29733: Databricks create/reset then run-now

Posted by "kyle-winkelman (via GitHub)" <gi...@apache.org>.
kyle-winkelman commented on issue #29733:
URL: https://github.com/apache/airflow/issues/29733#issuecomment-1442437525

   I have spoken to my Databricks representative and they don't have a high priority on syncing up the api for /2.1/jobs/runs/submit with all the options available in /2.1/jobs/create, so I don't expect it to be supported by the DatabricksSubmitRun operator any time soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #29733: Databricks create/reset then run-now

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on issue #29733:
URL: https://github.com/apache/airflow/issues/29733#issuecomment-1442426125

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kyle-winkelman commented on issue #29733: Databricks create/reset then run-now

Posted by "kyle-winkelman (via GitHub)" <gi...@apache.org>.
kyle-winkelman commented on issue #29733:
URL: https://github.com/apache/airflow/issues/29733#issuecomment-1442445629

   I have come up with a work around that uses the SimpleHttpOperator to directly hit the `api/2.1/jobs/create` (or `api/2.1/jobs/reset`) endpoint and then XComs the `job_id` for use by the DatabricksRunNow operator.
   ```python
   from airflow import DAG
   from airflow.models import Variable
   from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
   from airflow.providers.http.operators.http import SimpleHttpOperator
   from airflow.utils.task_group import TaskGroup
   from datetime import datetime
   
   with DAG(dag_id = 'CREATE_OR_RESET_THEN_RUN_NOW', schedule_interval = '@hourly', start_date = datetime(2020, 1, 1), catchup = False) as dag:
   
       def databricks_jobs_operator(group_id, job, conn_id = 'databricks_default'):
           with TaskGroup(group_id=group_id) as tg:
               jobs_create_or_reset = SimpleHttpOperator(
                   task_id = 'create_or_reset',
                   http_conn_id = conn_id,
                   endpoint = f"{{{{ 'api/2.1/jobs/create' if ti.xcom_pull(task_ids='{group_id}.create_or_reset', include_prior_dates=True) is none else 'api/2.1/jobs/reset' }}}}",
                   method = 'POST',
                   data = f"{{{{ ({job} if ti.xcom_pull(task_ids='{group_id}.create_or_reset', include_prior_dates=True) is none else {{'job_id': ti.xcom_pull(task_ids='{group_id}.create_or_reset', include_prior_dates=True), 'new_settings': {job}}}) | tojson }}}}",
                   headers = {'Content-Type': 'application/json'},
                   response_filter = lambda response, ti: response.json()['job_id'] if ti.xcom_pull(task_ids=f"{group_id}.create_or_reset", include_prior_dates=True) is None else ti.xcom_pull(task_ids=f"{group_id}.create_or_reset", include_prior_dates=True),
               )
               jobs_run_now = DatabricksRunNowOperator(
                   task_id = 'run_now',
                   job_id = f"{{{{ ti.xcom_pull(task_ids='{group_id}.create_or_reset') }}}}",
                   databricks_conn_id = conn_id,
               )
               jobs_create_or_reset >> jobs_run_now
           return tg
   
       job = {
           'name': 'test',
           'tasks': [
               {
                   'task_key': 'test1',
                   'notebook_task': {
                       'notebook_path': '/Users/abc@example.com/test2'
                   },
                   'job_cluster_key': 'job_cluster'
               },
               {
                   'task_key': 'test2',
                   'depends_on': [
                       {
                           'task_key': 'test1'
                       }
                   ],
                   'notebook_task': {
                       'notebook_path': '/Users/abc@example.com/test2'
                   },
                   'job_cluster_key': 'job_cluster'
               }
           ],
           'job_clusters': [
               {
                   'job_cluster_key': 'job_cluster',
                   'new_cluster': {
                       'spark_version': '10.4.x-scala2.12',
                       'node_type_id': 'Standard_D3_v2',
                       'num_workers': 2,
                   }
               }
           ]
       }
   
       test = databricks_jobs_operator(group_id='test', job=job)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Databricks create/reset then run-now [airflow]

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro closed issue #29733: Databricks create/reset then run-now
URL: https://github.com/apache/airflow/issues/29733


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on issue #29733: Databricks create/reset then run-now

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on issue #29733:
URL: https://github.com/apache/airflow/issues/29733#issuecomment-1442465033

   Sounds like a good feature, you can create a new operator `DatabricksCreateJobOperator` with a boolean argument `reset` to specify if we want to create the job or overwrite its settings, and return the job id as XCom in order to use it in the next steps (probably with `DatabricksRunNowOperator`).
   
   @kyle-winkelman I have assigned it to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org