You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/26 20:33:02 UTC
[airflow] branch main updated: DatabricksSubmitRunOperator dbt task support (#25623)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 25a9c6a905 DatabricksSubmitRunOperator dbt task support (#25623)
25a9c6a905 is described below
commit 25a9c6a9058b829fc038fdd3fc789890e563bd1d
Author: Gabor Ratky <ga...@databricks.com>
AuthorDate: Fri Aug 26 22:32:55 2022 +0200
DatabricksSubmitRunOperator dbt task support (#25623)
---
.../providers/databricks/operators/databricks.py | 23 ++++++--
.../operators/submit_run.rst | 4 +-
newsfragments/25623.improvement.rst | 1 +
.../databricks/operators/test_databricks.py | 62 +++++++++++++++++++++-
4 files changed, 83 insertions(+), 7 deletions(-)
diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py
index dd614dca01..a722f977f3 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -193,28 +193,28 @@ class DatabricksSubmitRunOperator(BaseOperator):
:param spark_jar_task: The main class and parameters for the JAR task. Note that
the actual JAR is specified in the ``libraries``.
*EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
- *OR* ``spark_submit_task`` *OR* ``pipeline_task`` should be specified.
+ *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
This field will be templated.
.. seealso::
https://docs.databricks.com/dev-tools/api/2.0/jobs.html#jobssparkjartask
:param notebook_task: The notebook path and parameters for the notebook task.
*EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
- *OR* ``spark_submit_task`` *OR* ``pipeline_task`` should be specified.
+ *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
This field will be templated.
.. seealso::
https://docs.databricks.com/dev-tools/api/2.0/jobs.html#jobsnotebooktask
:param spark_python_task: The python file path and parameters to run the python file with.
*EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
- *OR* ``spark_submit_task`` *OR* ``pipeline_task`` should be specified.
+ *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
This field will be templated.
.. seealso::
https://docs.databricks.com/dev-tools/api/2.0/jobs.html#jobssparkpythontask
:param spark_submit_task: Parameters needed to run a spark-submit command.
*EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
- *OR* ``spark_submit_task`` *OR* ``pipeline_task`` should be specified.
+ *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
This field will be templated.
.. seealso::
@@ -222,11 +222,18 @@ class DatabricksSubmitRunOperator(BaseOperator):
:param pipeline_task: Parameters needed to execute a Delta Live Tables pipeline task.
The provided dictionary must contain at least ``pipeline_id`` field!
*EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
- *OR* ``spark_submit_task`` *OR* ``pipeline_task`` should be specified.
+ *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
This field will be templated.
.. seealso::
https://docs.databricks.com/dev-tools/api/2.0/jobs.html#jobspipelinetask
+ :param dbt_task: Parameters needed to execute a dbt task.
+ The provided dictionary must contain at least the ``commands`` field and the
+ ``git_source`` parameter also needs to be set.
+ *EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
+ *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
+ This field will be templated.
+
:param new_cluster: Specs for a new cluster on which this task will be run.
*EITHER* ``new_cluster`` *OR* ``existing_cluster_id`` should be specified
(except when ``pipeline_task`` is used).
@@ -295,6 +302,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
spark_python_task: Optional[Dict[str, Union[str, List[str]]]] = None,
spark_submit_task: Optional[Dict[str, List[str]]] = None,
pipeline_task: Optional[Dict[str, str]] = None,
+ dbt_task: Optional[Dict[str, Union[str, List[str]]]] = None,
new_cluster: Optional[Dict[str, object]] = None,
existing_cluster_id: Optional[str] = None,
libraries: Optional[List[Dict[str, str]]] = None,
@@ -333,6 +341,8 @@ class DatabricksSubmitRunOperator(BaseOperator):
self.json['spark_submit_task'] = spark_submit_task
if pipeline_task is not None:
self.json['pipeline_task'] = pipeline_task
+ if dbt_task is not None:
+ self.json['dbt_task'] = dbt_task
if new_cluster is not None:
self.json['new_cluster'] = new_cluster
if existing_cluster_id is not None:
@@ -352,6 +362,9 @@ class DatabricksSubmitRunOperator(BaseOperator):
if git_source is not None:
self.json['git_source'] = git_source
+ if 'dbt_task' in self.json and 'git_source' not in self.json:
+ raise AirflowException('git_source is required for dbt_task')
+
self.json = normalise_json_content(self.json)
# This variable will be used in case our task gets killed.
self.run_id: Optional[int] = None
diff --git a/docs/apache-airflow-providers-databricks/operators/submit_run.rst b/docs/apache-airflow-providers-databricks/operators/submit_run.rst
index fe7e23a58c..3e13930235 100644
--- a/docs/apache-airflow-providers-databricks/operators/submit_run.rst
+++ b/docs/apache-airflow-providers-databricks/operators/submit_run.rst
@@ -55,6 +55,7 @@ one named parameter for each top level parameter in the ``runs/submit`` endpoint
* ``spark_python_task`` - python file path and parameters to run the python file with
* ``spark_submit_task`` - parameters needed to run a ``spark-submit`` command
* ``pipeline_task`` - parameters needed to run a Delta Live Tables pipeline
+ * ``dbt_task`` - parameters needed to run a dbt project
* Cluster specification - it should be one of:
* ``new_cluster`` - specs for a new cluster on which this task will be run
@@ -68,9 +69,10 @@ Currently the named parameters that ``DatabricksSubmitRunOperator`` supports are
- ``spark_jar_task``
- ``notebook_task``
- ``spark_python_task``
- - ``spark_jar_task``
- ``spark_submit_task``
- ``pipeline_task``
+ - ``dbt_task``
+ - ``git_source``
- ``new_cluster``
- ``existing_cluster_id``
- ``libraries``
diff --git a/newsfragments/25623.improvement.rst b/newsfragments/25623.improvement.rst
new file mode 100644
index 0000000000..4cfa738dc8
--- /dev/null
+++ b/newsfragments/25623.improvement.rst
@@ -0,0 +1 @@
+``DatabricksSubmitRunOperator`` now supports ``dbt_task`` to run dbt projects on Databricks.
diff --git a/tests/providers/databricks/operators/test_databricks.py b/tests/providers/databricks/operators/test_databricks.py
index 875ee8c87d..f72487196f 100644
--- a/tests/providers/databricks/operators/test_databricks.py
+++ b/tests/providers/databricks/operators/test_databricks.py
@@ -64,6 +64,11 @@ RENDERED_TEMPLATED_JAR_PARAMS = [f'/test-{DATE}']
TEMPLATED_JAR_PARAMS = ['/test-{{ ds }}']
PYTHON_PARAMS = ["john doe", "35"]
SPARK_SUBMIT_PARAMS = ["--class", "org.apache.spark.examples.SparkPi"]
+DBT_TASK = {
+ "commands": ["dbt deps", "dbt seed", "dbt run"],
+ "schema": "jaffle_shop",
+ "warehouse_id": "123456789abcdef0",
+}
def mock_dict(d: dict):
@@ -128,6 +133,61 @@ class TestDatabricksSubmitRunOperator(unittest.TestCase):
assert expected == op.json
+ def test_init_with_dbt_task_named_parameters(self):
+ """
+ Test the initializer with the named parameters.
+ """
+ git_source = {
+ 'git_url': 'https://github.com/dbt-labs/jaffle_shop',
+ 'git_provider': 'github',
+ 'git_branch': 'main',
+ }
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID, new_cluster=NEW_CLUSTER, dbt_task=DBT_TASK, git_source=git_source
+ )
+ expected = utils.normalise_json_content(
+ {'new_cluster': NEW_CLUSTER, 'dbt_task': DBT_TASK, 'git_source': git_source, 'run_name': TASK_ID}
+ )
+
+ assert expected == op.json
+
+ def test_init_with_dbt_task_mixed_parameters(self):
+ """
+ Test the initializer with mixed parameters.
+ """
+ git_source = {
+ 'git_url': 'https://github.com/dbt-labs/jaffle_shop',
+ 'git_provider': 'github',
+ 'git_branch': 'main',
+ }
+ json = {'git_source': git_source}
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID, new_cluster=NEW_CLUSTER, dbt_task=DBT_TASK, json=json
+ )
+ expected = utils.normalise_json_content(
+ {'new_cluster': NEW_CLUSTER, 'dbt_task': DBT_TASK, 'git_source': git_source, 'run_name': TASK_ID}
+ )
+
+ assert expected == op.json
+
+ def test_init_with_dbt_task_without_git_source_raises_error(self):
+ """
+ Test the initializer without the necessary git_source for dbt_task raises error.
+ """
+ exception_message = "git_source is required for dbt_task"
+ with pytest.raises(AirflowException, match=exception_message):
+ DatabricksSubmitRunOperator(task_id=TASK_ID, new_cluster=NEW_CLUSTER, dbt_task=DBT_TASK)
+
+ def test_init_with_dbt_task_json_without_git_source_raises_error(self):
+ """
+ Test the initializer without the necessary git_source for dbt_task raises error.
+ """
+ json = {'dbt_task': DBT_TASK, 'new_cluster': NEW_CLUSTER}
+
+ exception_message = "git_source is required for dbt_task"
+ with pytest.raises(AirflowException, match=exception_message):
+ DatabricksSubmitRunOperator(task_id=TASK_ID, json=json)
+
def test_init_with_json(self):
"""
Test the initializer with json data.
@@ -158,7 +218,7 @@ class TestDatabricksSubmitRunOperator(unittest.TestCase):
def test_pipeline_task(self):
"""
- Test the initializer with a specified run_name.
+ Test the initializer with a pipeline task.
"""
pipeline_task = {"pipeline_id": "test-dlt"}
json = {'new_cluster': NEW_CLUSTER, 'run_name': RUN_NAME, "pipeline_task": pipeline_task}