You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2022/04/25 00:03:40 UTC

[airflow] branch main updated: Further improvement of Databricks Jobs operators (#23199)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f02b0b6b40 Further improvement of Databricks Jobs operators (#23199)
f02b0b6b40 is described below

commit f02b0b6b4054bd3038fc3fec85adef7502ea0c3c
Author: Alex Ott <al...@gmail.com>
AuthorDate: Mon Apr 25 02:03:18 2022 +0200

    Further improvement of Databricks Jobs operators (#23199)
    
    This PR includes following changes:
    
    * Document missed parameters for `DatabricksSubmitRunOperator` and
      `DatabricksRunNowOperator`
    * Add support for new parameters in `DatabricksRunNowOperator`:
      `python_named_parameters` and `idempotency_token`
    * Rework documentation for both operators based on the feedback from
      another PR
---
 .../providers/databricks/operators/databricks.py   | 36 ++++++++++---
 .../operators/run_now.rst                          | 42 ++++-----------
 .../operators/submit_run.rst                       | 60 +++++++---------------
 3 files changed, 60 insertions(+), 78 deletions(-)

diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py
index 6e00211567..f9e4f24764 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -259,6 +259,14 @@ class DatabricksSubmitRunOperator(BaseOperator):
         By default this will be set to the Airflow ``task_id``. This ``task_id`` is a
         required parameter of the superclass ``BaseOperator``.
         This field will be templated.
+    :param idempotency_token: an optional token that can be used to guarantee the idempotency of job run
+        requests. If a run with the provided token already exists, the request does not create a new run but
+        returns the ID of the existing run instead.  This token must have at most 64 characters.
+    :param access_control_list: optional list of dictionaries representing Access Control List (ACL) for
+        a given job run.  Each dictionary consists of following field - specific subject (``user_name`` for
+        users, or ``group_name`` for groups), and ``permission_level`` for that subject.  See Jobs API
+        documentation for more details.
+    :param wait_for_termination: if we should wait for termination of the job run. ``True`` by default.
     :param timeout_seconds: The timeout for this run. By default a value of 0 is used
         which means to have no timeout.
         This field will be templated.
@@ -437,9 +445,10 @@ class DatabricksRunNowOperator(BaseOperator):
         - ``json``
         - ``notebook_params``
         - ``python_params``
+        - ``python_named_parameters``
         - ``jar_params``
         - ``spark_submit_params``
-
+        - ``idempotency_token``
 
     :param job_id: the job_id of the existing Databricks job.
         This field will be templated.
@@ -476,12 +485,18 @@ class DatabricksRunNowOperator(BaseOperator):
     :param python_params: A list of parameters for jobs with python tasks,
         e.g. "python_params": ["john doe", "35"].
         The parameters will be passed to python file as command line parameters.
-        If specified upon run-now, it would overwrite the parameters specified in
-        job setting.
+        If specified upon run-now, it would overwrite the parameters specified in job setting.
         The json representation of this field (i.e. {"python_params":["john doe","35"]})
         cannot exceed 10,000 bytes.
         This field will be templated.
 
+        .. seealso::
+            https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow
+    :param python_named_parameters: A list of parameters for jobs with python wheel tasks,
+        e.g. "python_named_parameters": {"name": "john doe", "age":  "35"}.
+        If specified upon run-now, it would overwrite the parameters specified in job setting.
+        This field will be templated.
+
         .. seealso::
             https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow
     :param jar_params: A list of parameters for jobs with JAR tasks,
@@ -505,9 +520,9 @@ class DatabricksRunNowOperator(BaseOperator):
 
         .. seealso::
             https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow
-    :param timeout_seconds: The timeout for this run. By default a value of 0 is used
-        which means to have no timeout.
-        This field will be templated.
+    :param idempotency_token: an optional token that can be used to guarantee the idempotency of job run
+        requests. If a run with the provided token already exists, the request does not create a new run but
+        returns the ID of the existing run instead.  This token must have at most 64 characters.
     :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`.
         By default and in the common case this will be ``databricks_default``. To use
         token based authentication, provide the key ``token`` in the extra field for the
@@ -516,8 +531,11 @@ class DatabricksRunNowOperator(BaseOperator):
         this run. By default the operator will poll every 30 seconds.
     :param databricks_retry_limit: Amount of times retry if the Databricks backend is
         unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
     :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
     :param do_xcom_push: Whether we should push run_id and run_page_url to xcom.
+    :param wait_for_termination: if we should wait for termination of the job run. ``True`` by default.
     """
 
     # Used in airflow.models.BaseOperator
@@ -538,6 +556,8 @@ class DatabricksRunNowOperator(BaseOperator):
         python_params: Optional[List[str]] = None,
         jar_params: Optional[List[str]] = None,
         spark_submit_params: Optional[List[str]] = None,
+        python_named_parameters: Optional[Dict[str, str]] = None,
+        idempotency_token: Optional[str] = None,
         databricks_conn_id: str = 'databricks_default',
         polling_period_seconds: int = 30,
         databricks_retry_limit: int = 3,
@@ -567,10 +587,14 @@ class DatabricksRunNowOperator(BaseOperator):
             self.json['notebook_params'] = notebook_params
         if python_params is not None:
             self.json['python_params'] = python_params
+        if python_named_parameters is not None:
+            self.json['python_named_parameters'] = python_named_parameters
         if jar_params is not None:
             self.json['jar_params'] = jar_params
         if spark_submit_params is not None:
             self.json['spark_submit_params'] = spark_submit_params
+        if idempotency_token is not None:
+            self.json['idempotency_token'] = idempotency_token
 
         self.json = _deep_string_coerce(self.json)
         # This variable will be used in case our task gets killed.
diff --git a/docs/apache-airflow-providers-databricks/operators/run_now.rst b/docs/apache-airflow-providers-databricks/operators/run_now.rst
index f77cfb2f53..8b2e6010ee 100644
--- a/docs/apache-airflow-providers-databricks/operators/run_now.rst
+++ b/docs/apache-airflow-providers-databricks/operators/run_now.rst
@@ -33,35 +33,15 @@ to call the ``api/2.1/jobs/run-now`` endpoint and pass it directly to our ``Data
 Another way to accomplish the same thing is to use the named parameters of the ``DatabricksRunNowOperator`` directly.
 Note that there is exactly one named parameter for each top level parameter in the ``jobs/run-now`` endpoint.
 
-.. list-table::
-   :widths: 15 25
-   :header-rows: 1
+The only required parameters are either:
 
-   * - Parameter
-     - Input
-   * - job_id: str
-     - ID of the existing Databricks jobs (required if ``job_name`` isn't provided).
-   * - job_name: str
-     - Name of the existing Databricks job (required if ``job_id`` isn't provided). It will throw exception if job isn't found, of if there are multiple jobs with the same name.
-   * - jar_params: list[str]
-     - A list of parameters for jobs with JAR tasks, e.g. ``"jar_params": ["john doe", "35"]``. The parameters will be passed to JAR file as command line parameters. If specified upon run-now, it would overwrite the parameters specified in job setting. The json representation of this field (i.e. ``{"jar_params":["john doe","35"]}``) cannot exceed 10,000 bytes. This field will be templated.
-   * - notebook_params: dict[str,str]
-     - A dict from keys to values for jobs with notebook task, e.g.``"notebook_params": {"name": "john doe", "age":  "35"}```. The map is passed to the notebook and will be accessible through the ``dbutils.widgets.get function``. See `Widgets <https://docs.databricks.com/notebooks/widgets.html>`_ for more information. If not specified upon run-now, the triggered run will use the job’s base parameters. ``notebook_params`` cannot be specified in conjunction with ``jar_params``. The json re [...]
-   * - python_params: list[str]
-     - A list of parameters for jobs with python tasks, e.g. ``"python_params": ["john doe", "35"]``. The parameters will be passed to python file as command line parameters. If specified upon run-now, it would overwrite the parameters specified in job setting. The json representation of this field (i.e. ``{"python_params":["john doe","35"]}``) cannot exceed 10,000 bytes. This field will be templated.
-   * - spark_submit_params: list[str]
-     - A list of parameters for jobs with spark submit task,  e.g. ``"spark_submit_params": ["--class", "org.apache.spark.examples.SparkPi"]``. The parameters will be passed to spark-submit script as command line parameters. If specified upon run-now, it would overwrite the parameters specified in job setting. The json representation of this field cannot exceed 10,000 bytes. This field will be templated.
-   * - timeout_seconds: int
-     - The timeout for this run. By default a value of 0 is used  which means to have no timeout. This field will be templated.
-   * - databricks_conn_id: string
-     - the name of the Airflow connection to use
-   * - polling_period_seconds: integer
-     - controls the rate which we poll for the result of this run
-   * - databricks_retry_limit: integer
-     - amount of times retry if the Databricks backend is unreachable
-   * - databricks_retry_delay: decimal
-     - number of seconds to wait between retries
-   * - databricks_retry_args: dict
-     - An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
-   * - do_xcom_push: boolean
-     - whether we should push run_id and run_page_url to xcom
+* ``job_id`` - to specify ID of the existing Databricks job
+* ``job_name`` - Name of the existing Databricks job. It will throw exception if job isn't found, of if there are multiple jobs with the same name.
+
+All other parameters are optional and described in documentation for ``DatabricksRunNowOperator``.  For example, you can pass additional parameters to a job using one of the following parameters, depending on the type of tasks in the job:
+
+* ``notebook_params``
+* ``python_params``
+* ``python_named_parameters``
+* ``jar_params``
+* ``spark_submit_params``
diff --git a/docs/apache-airflow-providers-databricks/operators/submit_run.rst b/docs/apache-airflow-providers-databricks/operators/submit_run.rst
index 497f72f352..da71194da7 100644
--- a/docs/apache-airflow-providers-databricks/operators/submit_run.rst
+++ b/docs/apache-airflow-providers-databricks/operators/submit_run.rst
@@ -31,49 +31,27 @@ Using the Operator
 ------------------
 
 There are two ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use
-to call the ``api/2.1/jobs/runs/submit`` endpoint and pass it directly to our ``DatabricksSubmitRunOperator`` through the ``json`` parameter.
+to call the ``api/2.1/jobs/runs/submit`` endpoint and pass it directly to our ``DatabricksSubmitRunOperator`` through the
+``json`` parameter.  With this approach you get full control over the underlying payload to Jobs REST API, including
+execution of Databricks jobs with multiple tasks, but it's harder to detect errors because of the lack of the type checking.
 
 Another way to accomplish the same thing is to use the named parameters of the ``DatabricksSubmitRunOperator`` directly. Note that there is exactly
-one named parameter for each top level parameter in the ``runs/submit`` endpoint.
-
-.. list-table::
-   :widths: 25 25
-   :header-rows: 1
-
-   * - Parameter
-     - Input
-   * - spark_jar_task: dict
-     - main class and parameters for the JAR task
-   * - notebook_task: dict
-     - notebook path and parameters for the task
-   * - spark_python_task: dict
-     - python file path and parameters to run the python file with
-   * - spark_submit_task: dict
-     - parameters needed to run a spark-submit command
-   * - pipeline_task: dict
-     - parameters needed to run a Delta Live Tables pipeline
-   * - new_cluster: dict
-     - specs for a new cluster on which this task will be run
-   * - existing_cluster_id: string
-     - ID for existing cluster on which to run this task
-   * - libraries: list of dict
-     - libraries which this run will use
-   * - run_name: string
-     - run name used for this task
-   * - timeout_seconds: integer
-     - The timeout for this run
-   * - databricks_conn_id: string
-     - the name of the Airflow connection to use
-   * - polling_period_seconds: integer
-     - controls the rate which we poll for the result of this run
-   * - databricks_retry_limit: integer
-     - amount of times retry if the Databricks backend is unreachable
-   * - databricks_retry_delay: decimal
-     - number of seconds to wait between retries
-   * - databricks_retry_args: dict
-     - An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
-   * - do_xcom_push: boolean
-     - whether we should push run_id and run_page_url to xcom
+one named parameter for each top level parameter in the ``runs/submit`` endpoint.  When using named parameters you must to specify following:
+
+* Task specification - it should be one of:
+
+  * ``spark_jar_task`` - main class and parameters for the JAR task
+  * ``notebook_task`` - notebook path and parameters for the task
+  * ``spark_python_task`` - python file path and parameters to run the python file with
+  * ``spark_submit_task`` - parameters needed to run a ``spark-submit`` command
+  * ``pipeline_task`` - parameters needed to run a Delta Live Tables pipeline
+
+* Cluster specification - it should be one of:
+  * ``new_cluster`` - specs for a new cluster on which this task will be run
+  * ``existing_cluster_id`` - ID for existing cluster on which to run this task
+
+All other parameters are optional, and described in the documentation of the ``DatabricksSubmitRunOperator`` class.
+
 
 Examples
 --------