You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/26 20:33:02 UTC

[airflow] branch main updated: DatabricksSubmitRunOperator dbt task support (#25623)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 25a9c6a905 DatabricksSubmitRunOperator dbt task support (#25623)
25a9c6a905 is described below

commit 25a9c6a9058b829fc038fdd3fc789890e563bd1d
Author: Gabor Ratky <ga...@databricks.com>
AuthorDate: Fri Aug 26 22:32:55 2022 +0200

    DatabricksSubmitRunOperator dbt task support (#25623)
---
 .../providers/databricks/operators/databricks.py   | 23 ++++++--
 .../operators/submit_run.rst                       |  4 +-
 newsfragments/25623.improvement.rst                |  1 +
 .../databricks/operators/test_databricks.py        | 62 +++++++++++++++++++++-
 4 files changed, 83 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py
index dd614dca01..a722f977f3 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -193,28 +193,28 @@ class DatabricksSubmitRunOperator(BaseOperator):
     :param spark_jar_task: The main class and parameters for the JAR task. Note that
         the actual JAR is specified in the ``libraries``.
         *EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
-        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` should be specified.
+        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
         This field will be templated.
 
         .. seealso::
             https://docs.databricks.com/dev-tools/api/2.0/jobs.html#jobssparkjartask
     :param notebook_task: The notebook path and parameters for the notebook task.
         *EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
-        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` should be specified.
+        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
         This field will be templated.
 
         .. seealso::
             https://docs.databricks.com/dev-tools/api/2.0/jobs.html#jobsnotebooktask
     :param spark_python_task: The python file path and parameters to run the python file with.
         *EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
-        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` should be specified.
+        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
         This field will be templated.
 
         .. seealso::
             https://docs.databricks.com/dev-tools/api/2.0/jobs.html#jobssparkpythontask
     :param spark_submit_task: Parameters needed to run a spark-submit command.
         *EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
-        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` should be specified.
+        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
         This field will be templated.
 
         .. seealso::
@@ -222,11 +222,18 @@ class DatabricksSubmitRunOperator(BaseOperator):
     :param pipeline_task: Parameters needed to execute a Delta Live Tables pipeline task.
         The provided dictionary must contain at least ``pipeline_id`` field!
         *EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
-        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` should be specified.
+        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
         This field will be templated.
 
         .. seealso::
             https://docs.databricks.com/dev-tools/api/2.0/jobs.html#jobspipelinetask
+    :param dbt_task: Parameters needed to execute a dbt task.
+        The provided dictionary must contain at least the ``commands`` field and the
+        ``git_source`` parameter also needs to be set.
+        *EITHER* ``spark_jar_task`` *OR* ``notebook_task`` *OR* ``spark_python_task``
+        *OR* ``spark_submit_task`` *OR* ``pipeline_task`` *OR* ``dbt_task`` should be specified.
+        This field will be templated.
+
     :param new_cluster: Specs for a new cluster on which this task will be run.
         *EITHER* ``new_cluster`` *OR* ``existing_cluster_id`` should be specified
         (except when ``pipeline_task`` is used).
@@ -295,6 +302,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
         spark_python_task: Optional[Dict[str, Union[str, List[str]]]] = None,
         spark_submit_task: Optional[Dict[str, List[str]]] = None,
         pipeline_task: Optional[Dict[str, str]] = None,
+        dbt_task: Optional[Dict[str, Union[str, List[str]]]] = None,
         new_cluster: Optional[Dict[str, object]] = None,
         existing_cluster_id: Optional[str] = None,
         libraries: Optional[List[Dict[str, str]]] = None,
@@ -333,6 +341,8 @@ class DatabricksSubmitRunOperator(BaseOperator):
             self.json['spark_submit_task'] = spark_submit_task
         if pipeline_task is not None:
             self.json['pipeline_task'] = pipeline_task
+        if dbt_task is not None:
+            self.json['dbt_task'] = dbt_task
         if new_cluster is not None:
             self.json['new_cluster'] = new_cluster
         if existing_cluster_id is not None:
@@ -352,6 +362,9 @@ class DatabricksSubmitRunOperator(BaseOperator):
         if git_source is not None:
             self.json['git_source'] = git_source
 
+        if 'dbt_task' in self.json and 'git_source' not in self.json:
+            raise AirflowException('git_source is required for dbt_task')
+
         self.json = normalise_json_content(self.json)
         # This variable will be used in case our task gets killed.
         self.run_id: Optional[int] = None
diff --git a/docs/apache-airflow-providers-databricks/operators/submit_run.rst b/docs/apache-airflow-providers-databricks/operators/submit_run.rst
index fe7e23a58c..3e13930235 100644
--- a/docs/apache-airflow-providers-databricks/operators/submit_run.rst
+++ b/docs/apache-airflow-providers-databricks/operators/submit_run.rst
@@ -55,6 +55,7 @@ one named parameter for each top level parameter in the ``runs/submit`` endpoint
   * ``spark_python_task`` - python file path and parameters to run the python file with
   * ``spark_submit_task`` - parameters needed to run a ``spark-submit`` command
   * ``pipeline_task`` - parameters needed to run a Delta Live Tables pipeline
+  * ``dbt_task`` - parameters needed to run a dbt project
 
 * Cluster specification - it should be one of:
   * ``new_cluster`` - specs for a new cluster on which this task will be run
@@ -68,9 +69,10 @@ Currently the named parameters that ``DatabricksSubmitRunOperator`` supports are
     - ``spark_jar_task``
     - ``notebook_task``
     - ``spark_python_task``
-    - ``spark_jar_task``
     - ``spark_submit_task``
     - ``pipeline_task``
+    - ``dbt_task``
+    - ``git_source``
     - ``new_cluster``
     - ``existing_cluster_id``
     - ``libraries``
diff --git a/newsfragments/25623.improvement.rst b/newsfragments/25623.improvement.rst
new file mode 100644
index 0000000000..4cfa738dc8
--- /dev/null
+++ b/newsfragments/25623.improvement.rst
@@ -0,0 +1 @@
+``DatabricksSubmitRunOperator`` now supports ``dbt_task`` to run dbt projects on Databricks.
diff --git a/tests/providers/databricks/operators/test_databricks.py b/tests/providers/databricks/operators/test_databricks.py
index 875ee8c87d..f72487196f 100644
--- a/tests/providers/databricks/operators/test_databricks.py
+++ b/tests/providers/databricks/operators/test_databricks.py
@@ -64,6 +64,11 @@ RENDERED_TEMPLATED_JAR_PARAMS = [f'/test-{DATE}']
 TEMPLATED_JAR_PARAMS = ['/test-{{ ds }}']
 PYTHON_PARAMS = ["john doe", "35"]
 SPARK_SUBMIT_PARAMS = ["--class", "org.apache.spark.examples.SparkPi"]
+DBT_TASK = {
+    "commands": ["dbt deps", "dbt seed", "dbt run"],
+    "schema": "jaffle_shop",
+    "warehouse_id": "123456789abcdef0",
+}
 
 
 def mock_dict(d: dict):
@@ -128,6 +133,61 @@ class TestDatabricksSubmitRunOperator(unittest.TestCase):
 
         assert expected == op.json
 
+    def test_init_with_dbt_task_named_parameters(self):
+        """
+        Test the initializer with the named parameters.
+        """
+        git_source = {
+            'git_url': 'https://github.com/dbt-labs/jaffle_shop',
+            'git_provider': 'github',
+            'git_branch': 'main',
+        }
+        op = DatabricksSubmitRunOperator(
+            task_id=TASK_ID, new_cluster=NEW_CLUSTER, dbt_task=DBT_TASK, git_source=git_source
+        )
+        expected = utils.normalise_json_content(
+            {'new_cluster': NEW_CLUSTER, 'dbt_task': DBT_TASK, 'git_source': git_source, 'run_name': TASK_ID}
+        )
+
+        assert expected == op.json
+
+    def test_init_with_dbt_task_mixed_parameters(self):
+        """
+        Test the initializer with mixed parameters.
+        """
+        git_source = {
+            'git_url': 'https://github.com/dbt-labs/jaffle_shop',
+            'git_provider': 'github',
+            'git_branch': 'main',
+        }
+        json = {'git_source': git_source}
+        op = DatabricksSubmitRunOperator(
+            task_id=TASK_ID, new_cluster=NEW_CLUSTER, dbt_task=DBT_TASK, json=json
+        )
+        expected = utils.normalise_json_content(
+            {'new_cluster': NEW_CLUSTER, 'dbt_task': DBT_TASK, 'git_source': git_source, 'run_name': TASK_ID}
+        )
+
+        assert expected == op.json
+
+    def test_init_with_dbt_task_without_git_source_raises_error(self):
+        """
+        Test the initializer without the necessary git_source for dbt_task raises error.
+        """
+        exception_message = "git_source is required for dbt_task"
+        with pytest.raises(AirflowException, match=exception_message):
+            DatabricksSubmitRunOperator(task_id=TASK_ID, new_cluster=NEW_CLUSTER, dbt_task=DBT_TASK)
+
+    def test_init_with_dbt_task_json_without_git_source_raises_error(self):
+        """
+        Test the initializer without the necessary git_source for dbt_task raises error.
+        """
+        json = {'dbt_task': DBT_TASK, 'new_cluster': NEW_CLUSTER}
+
+        exception_message = "git_source is required for dbt_task"
+        with pytest.raises(AirflowException, match=exception_message):
+            DatabricksSubmitRunOperator(task_id=TASK_ID, json=json)
+
     def test_init_with_json(self):
         """
         Test the initializer with json data.
@@ -158,7 +218,7 @@ class TestDatabricksSubmitRunOperator(unittest.TestCase):
 
     def test_pipeline_task(self):
         """
-        Test the initializer with a specified run_name.
+        Test the initializer with a pipeline task.
         """
         pipeline_task = {"pipeline_id": "test-dlt"}
         json = {'new_cluster': NEW_CLUSTER, 'run_name': RUN_NAME, "pipeline_task": pipeline_task}