You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/07/27 05:26:57 UTC
[airflow] branch main updated: More improvements in the Databricks operators (#25260)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 54a8c4fd2a More improvements in the Databricks operators (#25260)
54a8c4fd2a is described below
commit 54a8c4fd2a1d1af6166f43d588dca8ce24bd058b
Author: Alex Ott <al...@gmail.com>
AuthorDate: Wed Jul 27 07:26:43 2022 +0200
More improvements in the Databricks operators (#25260)
---
airflow/providers/databricks/operators/databricks.py | 12 ++++++------
airflow/providers/databricks/operators/databricks_repos.py | 12 ++++++------
airflow/providers/databricks/operators/databricks_sql.py | 14 +++++++++++---
3 files changed, 23 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py
index 1ecf23639d..53fe8b0cb3 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -530,8 +530,8 @@ class DatabricksRunNowOperator(BaseOperator):
.. seealso::
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow
- :param python_named_parameters: A list of parameters for jobs with python wheel tasks,
- e.g. "python_named_parameters": {"name": "john doe", "age": "35"}.
+ :param python_named_params: A list of named parameters for jobs with python wheel tasks,
+ e.g. "python_named_params": {"name": "john doe", "age": "35"}.
If specified upon run-now, it would overwrite the parameters specified in job setting.
This field will be templated.
@@ -566,7 +566,7 @@ class DatabricksRunNowOperator(BaseOperator):
token based authentication, provide the key ``token`` in the extra field for the
connection and create the key ``host`` and leave the ``host`` field empty. (templated)
:param polling_period_seconds: Controls the rate which we poll for the result of
- this run. By default the operator will poll every 30 seconds.
+ this run. By default, the operator will poll every 30 seconds.
:param databricks_retry_limit: Amount of times retry if the Databricks backend is
unreachable. Its value must be greater than or equal to 1.
:param databricks_retry_delay: Number of seconds to wait between retries (it
@@ -594,7 +594,7 @@ class DatabricksRunNowOperator(BaseOperator):
python_params: Optional[List[str]] = None,
jar_params: Optional[List[str]] = None,
spark_submit_params: Optional[List[str]] = None,
- python_named_parameters: Optional[Dict[str, str]] = None,
+ python_named_params: Optional[Dict[str, str]] = None,
idempotency_token: Optional[str] = None,
databricks_conn_id: str = 'databricks_default',
polling_period_seconds: int = 30,
@@ -625,8 +625,8 @@ class DatabricksRunNowOperator(BaseOperator):
self.json['notebook_params'] = notebook_params
if python_params is not None:
self.json['python_params'] = python_params
- if python_named_parameters is not None:
- self.json['python_named_parameters'] = python_named_parameters
+ if python_named_params is not None:
+ self.json['python_named_params'] = python_named_params
if jar_params is not None:
self.json['jar_params'] = jar_params
if spark_submit_params is not None:
diff --git a/airflow/providers/databricks/operators/databricks_repos.py b/airflow/providers/databricks/operators/databricks_repos.py
index 32100bcb34..02527123ef 100644
--- a/airflow/providers/databricks/operators/databricks_repos.py
+++ b/airflow/providers/databricks/operators/databricks_repos.py
@@ -47,7 +47,7 @@ class DatabricksReposCreateOperator(BaseOperator):
:param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`.
By default and in the common case this will be ``databricks_default``. To use
token based authentication, provide the key ``token`` in the extra field for the
- connection and create the key ``host`` and leave the ``host`` field empty.
+ connection and create the key ``host`` and leave the ``host`` field empty. (templated)
:param databricks_retry_limit: Amount of times retry if the Databricks backend is
unreachable. Its value must be greater than or equal to 1.
:param databricks_retry_delay: Number of seconds to wait between retries (it
@@ -55,7 +55,7 @@ class DatabricksReposCreateOperator(BaseOperator):
"""
# Used in airflow.models.BaseOperator
- template_fields: Sequence[str] = ('repo_path', 'tag', 'branch')
+ template_fields: Sequence[str] = ('repo_path', 'tag', 'branch', 'databricks_conn_id')
__git_providers__ = {
"github.com": "gitHub",
@@ -175,7 +175,7 @@ class DatabricksReposUpdateOperator(BaseOperator):
:param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`.
By default and in the common case this will be ``databricks_default``. To use
token based authentication, provide the key ``token`` in the extra field for the
- connection and create the key ``host`` and leave the ``host`` field empty.
+ connection and create the key ``host`` and leave the ``host`` field empty. (templated)
:param databricks_retry_limit: Amount of times retry if the Databricks backend is
unreachable. Its value must be greater than or equal to 1.
:param databricks_retry_delay: Number of seconds to wait between retries (it
@@ -183,7 +183,7 @@ class DatabricksReposUpdateOperator(BaseOperator):
"""
# Used in airflow.models.BaseOperator
- template_fields: Sequence[str] = ('repo_path', 'tag', 'branch')
+ template_fields: Sequence[str] = ('repo_path', 'tag', 'branch', 'databricks_conn_id')
def __init__(
self,
@@ -249,7 +249,7 @@ class DatabricksReposDeleteOperator(BaseOperator):
:param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`.
By default and in the common case this will be ``databricks_default``. To use
token based authentication, provide the key ``token`` in the extra field for the
- connection and create the key ``host`` and leave the ``host`` field empty.
+ connection and create the key ``host`` and leave the ``host`` field empty. (templated)
:param databricks_retry_limit: Amount of times retry if the Databricks backend is
unreachable. Its value must be greater than or equal to 1.
:param databricks_retry_delay: Number of seconds to wait between retries (it
@@ -257,7 +257,7 @@ class DatabricksReposDeleteOperator(BaseOperator):
"""
# Used in airflow.models.BaseOperator
- template_fields: Sequence[str] = ('repo_path',)
+ template_fields: Sequence[str] = ('repo_path', 'databricks_conn_id')
def __init__(
self,
diff --git a/airflow/providers/databricks/operators/databricks_sql.py b/airflow/providers/databricks/operators/databricks_sql.py
index 6ee125a35e..4c36e00e27 100644
--- a/airflow/providers/databricks/operators/databricks_sql.py
+++ b/airflow/providers/databricks/operators/databricks_sql.py
@@ -42,7 +42,7 @@ class DatabricksSqlOperator(BaseOperator):
:ref:`howto/operator:DatabricksSqlOperator`
:param databricks_conn_id: Reference to
- :ref:`Databricks connection id<howto/connection:databricks>`
+ :ref:`Databricks connection id<howto/connection:databricks>` (templated)
:param http_path: Optional string specifying HTTP path of Databricks SQL Endpoint or cluster.
If not specified, it should be either specified in the Databricks connection's extra parameters,
or ``sql_endpoint_name`` must be specified.
@@ -65,7 +65,14 @@ class DatabricksSqlOperator(BaseOperator):
:param csv_params: parameters that will be passed to the ``csv.DictWriter`` class used to write CSV data.
"""
- template_fields: Sequence[str] = ('sql', '_output_path', 'schema', 'catalog', 'http_headers')
+ template_fields: Sequence[str] = (
+ 'sql',
+ '_output_path',
+ 'schema',
+ 'catalog',
+ 'http_headers',
+ 'databricks_conn_id',
+ )
template_ext: Sequence[str] = ('.sql',)
template_fields_renderers = {'sql': 'sql'}
@@ -179,7 +186,7 @@ class DatabricksCopyIntoOperator(BaseOperator):
:param file_format: Required file format. Supported formats are
``CSV``, ``JSON``, ``AVRO``, ``ORC``, ``PARQUET``, ``TEXT``, ``BINARYFILE``.
:param databricks_conn_id: Reference to
- :ref:`Databricks connection id<howto/connection:databricks>`
+ :ref:`Databricks connection id<howto/connection:databricks>` (templated)
:param http_path: Optional string specifying HTTP path of Databricks SQL Endpoint or cluster.
If not specified, it should be either specified in the Databricks connection's extra parameters,
or ``sql_endpoint_name`` must be specified.
@@ -210,6 +217,7 @@ class DatabricksCopyIntoOperator(BaseOperator):
'_file_location',
'_files',
'_table_name',
+ 'databricks_conn_id',
)
def __init__(