You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "kyle-winkelman (via GitHub)" <gi...@apache.org> on 2023/02/23 21:01:28 UTC
[GitHub] [airflow] kyle-winkelman opened a new issue, #29733: Databricks create/reset then run-now
kyle-winkelman opened a new issue, #29733:
URL: https://github.com/apache/airflow/issues/29733
### Description
Allow an Airflow DAG to define a Databricks job with the /2.1/jobs/create (or /2.1/jobs/reset) endpoint then run that same job with the /2.1/jobs/run-now endpoint. This would give similar capabilities as the DatabricksSubmitRun operator, but the /2.1/jobs/create endpoint supports additional parameters that the /2.1/jobs/runs/submit doesn't (e.g. job_clusters, email notifications, etc.).
### Use case/motivation
Create and run a Databricks job all in the Airflow DAG. Currently, DatabricksSubmitRun operator uses the /2.1/jobs/runs/submit endpoint which doesn't support all features and creates runs that aren't tied to a job in the Databricks UI. Also, DatabricksRunNow operator requires you to define the job either directly in the Databricks UI or through a separate CI/CD pipeline causing the headache of having to change code in multiple places.
### Related issues
_No response_
### Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kyle-winkelman commented on issue #29733: Databricks create/reset then run-now
Posted by "kyle-winkelman (via GitHub)" <gi...@apache.org>.
kyle-winkelman commented on issue #29733:
URL: https://github.com/apache/airflow/issues/29733#issuecomment-1442437525
I have spoken to my Databricks representative and they don't have a high priority on syncing up the api for /2.1/jobs/runs/submit with all the options available in /2.1/jobs/create, so I don't expect it to be supported by the DatabricksSubmitRun operator any time soon.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #29733: Databricks create/reset then run-now
Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on issue #29733:
URL: https://github.com/apache/airflow/issues/29733#issuecomment-1442426125
Thanks for opening your first issue here! Be sure to follow the issue template!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kyle-winkelman commented on issue #29733: Databricks create/reset then run-now
Posted by "kyle-winkelman (via GitHub)" <gi...@apache.org>.
kyle-winkelman commented on issue #29733:
URL: https://github.com/apache/airflow/issues/29733#issuecomment-1442445629
I have come up with a work around that uses the SimpleHttpOperator to directly hit the `api/2.1/jobs/create` (or `api/2.1/jobs/reset`) endpoint and then XComs the `job_id` for use by the DatabricksRunNow operator.
```python
from airflow import DAG
from airflow.models import Variable
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
with DAG(dag_id = 'CREATE_OR_RESET_THEN_RUN_NOW', schedule_interval = '@hourly', start_date = datetime(2020, 1, 1), catchup = False) as dag:
def databricks_jobs_operator(group_id, job, conn_id = 'databricks_default'):
with TaskGroup(group_id=group_id) as tg:
jobs_create_or_reset = SimpleHttpOperator(
task_id = 'create_or_reset',
http_conn_id = conn_id,
endpoint = f"{{{{ 'api/2.1/jobs/create' if ti.xcom_pull(task_ids='{group_id}.create_or_reset', include_prior_dates=True) is none else 'api/2.1/jobs/reset' }}}}",
method = 'POST',
data = f"{{{{ ({job} if ti.xcom_pull(task_ids='{group_id}.create_or_reset', include_prior_dates=True) is none else {{'job_id': ti.xcom_pull(task_ids='{group_id}.create_or_reset', include_prior_dates=True), 'new_settings': {job}}}) | tojson }}}}",
headers = {'Content-Type': 'application/json'},
response_filter = lambda response, ti: response.json()['job_id'] if ti.xcom_pull(task_ids=f"{group_id}.create_or_reset", include_prior_dates=True) is None else ti.xcom_pull(task_ids=f"{group_id}.create_or_reset", include_prior_dates=True),
)
jobs_run_now = DatabricksRunNowOperator(
task_id = 'run_now',
job_id = f"{{{{ ti.xcom_pull(task_ids='{group_id}.create_or_reset') }}}}",
databricks_conn_id = conn_id,
)
jobs_create_or_reset >> jobs_run_now
return tg
job = {
'name': 'test',
'tasks': [
{
'task_key': 'test1',
'notebook_task': {
'notebook_path': '/Users/abc@example.com/test2'
},
'job_cluster_key': 'job_cluster'
},
{
'task_key': 'test2',
'depends_on': [
{
'task_key': 'test1'
}
],
'notebook_task': {
'notebook_path': '/Users/abc@example.com/test2'
},
'job_cluster_key': 'job_cluster'
}
],
'job_clusters': [
{
'job_cluster_key': 'job_cluster',
'new_cluster': {
'spark_version': '10.4.x-scala2.12',
'node_type_id': 'Standard_D3_v2',
'num_workers': 2,
}
}
]
}
test = databricks_jobs_operator(group_id='test', job=job)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [I] Databricks create/reset then run-now [airflow]
Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro closed issue #29733: Databricks create/reset then run-now
URL: https://github.com/apache/airflow/issues/29733
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] hussein-awala commented on issue #29733: Databricks create/reset then run-now
Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on issue #29733:
URL: https://github.com/apache/airflow/issues/29733#issuecomment-1442465033
Sounds like a good feature, you can create a new operator `DatabricksCreateJobOperator` with a boolean argument `reset` to specify if we want to create the job or overwrite its settings, and return the job id as XCom in order to use it in the next steps (probably with `DatabricksRunNowOperator`).
@kyle-winkelman I have assigned it to you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org