You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/07/27 05:26:57 UTC

[airflow] branch main updated: More improvements in the Databricks operators (#25260)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 54a8c4fd2a More improvements in the Databricks operators (#25260)
54a8c4fd2a is described below

commit 54a8c4fd2a1d1af6166f43d588dca8ce24bd058b
Author: Alex Ott <al...@gmail.com>
AuthorDate: Wed Jul 27 07:26:43 2022 +0200

    More improvements in the Databricks operators (#25260)
---
 airflow/providers/databricks/operators/databricks.py       | 12 ++++++------
 airflow/providers/databricks/operators/databricks_repos.py | 12 ++++++------
 airflow/providers/databricks/operators/databricks_sql.py   | 14 +++++++++++---
 3 files changed, 23 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py
index 1ecf23639d..53fe8b0cb3 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -530,8 +530,8 @@ class DatabricksRunNowOperator(BaseOperator):
 
         .. seealso::
             https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow
-    :param python_named_parameters: A list of parameters for jobs with python wheel tasks,
-        e.g. "python_named_parameters": {"name": "john doe", "age":  "35"}.
+    :param python_named_params: A list of named parameters for jobs with python wheel tasks,
+        e.g. "python_named_params": {"name": "john doe", "age":  "35"}.
         If specified upon run-now, it would overwrite the parameters specified in job setting.
         This field will be templated.
 
@@ -566,7 +566,7 @@ class DatabricksRunNowOperator(BaseOperator):
         token based authentication, provide the key ``token`` in the extra field for the
         connection and create the key ``host`` and leave the ``host`` field empty. (templated)
     :param polling_period_seconds: Controls the rate which we poll for the result of
-        this run. By default the operator will poll every 30 seconds.
+        this run. By default, the operator will poll every 30 seconds.
     :param databricks_retry_limit: Amount of times retry if the Databricks backend is
         unreachable. Its value must be greater than or equal to 1.
     :param databricks_retry_delay: Number of seconds to wait between retries (it
@@ -594,7 +594,7 @@ class DatabricksRunNowOperator(BaseOperator):
         python_params: Optional[List[str]] = None,
         jar_params: Optional[List[str]] = None,
         spark_submit_params: Optional[List[str]] = None,
-        python_named_parameters: Optional[Dict[str, str]] = None,
+        python_named_params: Optional[Dict[str, str]] = None,
         idempotency_token: Optional[str] = None,
         databricks_conn_id: str = 'databricks_default',
         polling_period_seconds: int = 30,
@@ -625,8 +625,8 @@ class DatabricksRunNowOperator(BaseOperator):
             self.json['notebook_params'] = notebook_params
         if python_params is not None:
             self.json['python_params'] = python_params
-        if python_named_parameters is not None:
-            self.json['python_named_parameters'] = python_named_parameters
+        if python_named_params is not None:
+            self.json['python_named_params'] = python_named_params
         if jar_params is not None:
             self.json['jar_params'] = jar_params
         if spark_submit_params is not None:
diff --git a/airflow/providers/databricks/operators/databricks_repos.py b/airflow/providers/databricks/operators/databricks_repos.py
index 32100bcb34..02527123ef 100644
--- a/airflow/providers/databricks/operators/databricks_repos.py
+++ b/airflow/providers/databricks/operators/databricks_repos.py
@@ -47,7 +47,7 @@ class DatabricksReposCreateOperator(BaseOperator):
     :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`.
         By default and in the common case this will be ``databricks_default``. To use
         token based authentication, provide the key ``token`` in the extra field for the
-        connection and create the key ``host`` and leave the ``host`` field empty.
+        connection and create the key ``host`` and leave the ``host`` field empty. (templated)
     :param databricks_retry_limit: Amount of times retry if the Databricks backend is
         unreachable. Its value must be greater than or equal to 1.
     :param databricks_retry_delay: Number of seconds to wait between retries (it
@@ -55,7 +55,7 @@ class DatabricksReposCreateOperator(BaseOperator):
     """
 
     # Used in airflow.models.BaseOperator
-    template_fields: Sequence[str] = ('repo_path', 'tag', 'branch')
+    template_fields: Sequence[str] = ('repo_path', 'tag', 'branch', 'databricks_conn_id')
 
     __git_providers__ = {
         "github.com": "gitHub",
@@ -175,7 +175,7 @@ class DatabricksReposUpdateOperator(BaseOperator):
     :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`.
         By default and in the common case this will be ``databricks_default``. To use
         token based authentication, provide the key ``token`` in the extra field for the
-        connection and create the key ``host`` and leave the ``host`` field empty.
+        connection and create the key ``host`` and leave the ``host`` field empty.  (templated)
     :param databricks_retry_limit: Amount of times retry if the Databricks backend is
         unreachable. Its value must be greater than or equal to 1.
     :param databricks_retry_delay: Number of seconds to wait between retries (it
@@ -183,7 +183,7 @@ class DatabricksReposUpdateOperator(BaseOperator):
     """
 
     # Used in airflow.models.BaseOperator
-    template_fields: Sequence[str] = ('repo_path', 'tag', 'branch')
+    template_fields: Sequence[str] = ('repo_path', 'tag', 'branch', 'databricks_conn_id')
 
     def __init__(
         self,
@@ -249,7 +249,7 @@ class DatabricksReposDeleteOperator(BaseOperator):
     :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`.
         By default and in the common case this will be ``databricks_default``. To use
         token based authentication, provide the key ``token`` in the extra field for the
-        connection and create the key ``host`` and leave the ``host`` field empty.
+        connection and create the key ``host`` and leave the ``host`` field empty. (templated)
     :param databricks_retry_limit: Amount of times retry if the Databricks backend is
         unreachable. Its value must be greater than or equal to 1.
     :param databricks_retry_delay: Number of seconds to wait between retries (it
@@ -257,7 +257,7 @@ class DatabricksReposDeleteOperator(BaseOperator):
     """
 
     # Used in airflow.models.BaseOperator
-    template_fields: Sequence[str] = ('repo_path',)
+    template_fields: Sequence[str] = ('repo_path', 'databricks_conn_id')
 
     def __init__(
         self,
diff --git a/airflow/providers/databricks/operators/databricks_sql.py b/airflow/providers/databricks/operators/databricks_sql.py
index 6ee125a35e..4c36e00e27 100644
--- a/airflow/providers/databricks/operators/databricks_sql.py
+++ b/airflow/providers/databricks/operators/databricks_sql.py
@@ -42,7 +42,7 @@ class DatabricksSqlOperator(BaseOperator):
         :ref:`howto/operator:DatabricksSqlOperator`
 
     :param databricks_conn_id: Reference to
-        :ref:`Databricks connection id<howto/connection:databricks>`
+        :ref:`Databricks connection id<howto/connection:databricks>` (templated)
     :param http_path: Optional string specifying HTTP path of Databricks SQL Endpoint or cluster.
         If not specified, it should be either specified in the Databricks connection's extra parameters,
         or ``sql_endpoint_name`` must be specified.
@@ -65,7 +65,14 @@ class DatabricksSqlOperator(BaseOperator):
     :param csv_params: parameters that will be passed to the ``csv.DictWriter`` class used to write CSV data.
     """
 
-    template_fields: Sequence[str] = ('sql', '_output_path', 'schema', 'catalog', 'http_headers')
+    template_fields: Sequence[str] = (
+        'sql',
+        '_output_path',
+        'schema',
+        'catalog',
+        'http_headers',
+        'databricks_conn_id',
+    )
     template_ext: Sequence[str] = ('.sql',)
     template_fields_renderers = {'sql': 'sql'}
 
@@ -179,7 +186,7 @@ class DatabricksCopyIntoOperator(BaseOperator):
     :param file_format: Required file format. Supported formats are
         ``CSV``, ``JSON``, ``AVRO``, ``ORC``, ``PARQUET``, ``TEXT``, ``BINARYFILE``.
     :param databricks_conn_id: Reference to
-        :ref:`Databricks connection id<howto/connection:databricks>`
+        :ref:`Databricks connection id<howto/connection:databricks>` (templated)
     :param http_path: Optional string specifying HTTP path of Databricks SQL Endpoint or cluster.
         If not specified, it should be either specified in the Databricks connection's extra parameters,
         or ``sql_endpoint_name`` must be specified.
@@ -210,6 +217,7 @@ class DatabricksCopyIntoOperator(BaseOperator):
         '_file_location',
         '_files',
         '_table_name',
+        'databricks_conn_id',
     )
 
     def __init__(