You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/12/29 12:53:54 UTC
(airflow) branch main updated: change spark connection form and add spark connections docs (#36419)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ed9080ae6a change spark connection form and add spark connections docs (#36419)
ed9080ae6a is described below
commit ed9080ae6a17d7b2478652b676579f162462bb70
Author: shohamy7 <46...@users.noreply.github.com>
AuthorDate: Fri Dec 29 14:53:44 2023 +0200
change spark connection form and add spark connections docs (#36419)
* change spark connection form and add spark connections docs
* make SQL letter upercase in spark-sql connection header
* rename from spark to spark-submit and add default values in connection form
---
airflow/providers/apache/spark/hooks/spark_jdbc.py | 2 +-
airflow/providers/apache/spark/hooks/spark_sql.py | 25 +++++++++++
.../providers/apache/spark/hooks/spark_submit.py | 45 +++++++++++++++++---
.../providers/apache/spark/operators/spark_jdbc.py | 2 +-
.../apache/spark/operators/spark_submit.py | 2 +-
.../connections/index.rst | 28 +++++++++++++
.../connections/{spark.rst => spark-connect.rst} | 32 +++------------
.../connections/spark-sql.rst | 48 ++++++++++++++++++++++
.../connections/{spark.rst => spark-submit.rst} | 31 ++++++--------
.../index.rst | 2 +-
.../operators.rst | 4 +-
11 files changed, 166 insertions(+), 55 deletions(-)
diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py b/airflow/providers/apache/spark/hooks/spark_jdbc.py
index 8c8d02f1ec..b904ca4260 100644
--- a/airflow/providers/apache/spark/hooks/spark_jdbc.py
+++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py
@@ -29,7 +29,7 @@ class SparkJDBCHook(SparkSubmitHook):
Extends the SparkSubmitHook for performing data transfers to/from JDBC-based databases with Apache Spark.
:param spark_app_name: Name of the job (default airflow-spark-jdbc)
- :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>`
+ :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>`
as configured in Airflow administration
:param spark_conf: Any additional Spark configuration properties
:param spark_py_files: Additional python files used (.zip, .egg, or .py)
diff --git a/airflow/providers/apache/spark/hooks/spark_sql.py b/airflow/providers/apache/spark/hooks/spark_sql.py
index 41dc741ccd..46eec49f30 100644
--- a/airflow/providers/apache/spark/hooks/spark_sql.py
+++ b/airflow/providers/apache/spark/hooks/spark_sql.py
@@ -54,6 +54,31 @@ class SparkSqlHook(BaseHook):
conn_type = "spark_sql"
hook_name = "Spark SQL"
+ @classmethod
+ def get_ui_field_behaviour(cls) -> dict[str, Any]:
+ """Return custom field behaviour."""
+ return {
+ "hidden_fields": ["schema", "login", "password", "extra"],
+ "relabeling": {},
+ }
+
+ @classmethod
+ def get_connection_form_widgets(cls) -> dict[str, Any]:
+ """Returns connection widgets to add to connection form."""
+ from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+ from flask_babel import lazy_gettext
+ from wtforms import StringField
+ from wtforms.validators import Optional
+
+ return {
+ "queue": StringField(
+ lazy_gettext("YARN queue"),
+ widget=BS3TextFieldWidget(),
+ description="Default YARN queue to use",
+ validators=[Optional()],
+ )
+ }
+
def __init__(
self,
sql: str,
diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py
index b495cafeb6..b96d992bba 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -33,7 +33,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
with contextlib.suppress(ImportError, NameError):
from airflow.providers.cncf.kubernetes import kube_client
-ALLOWED_SPARK_BINARIES = ["spark-submit", "spark2-submit", "spark3-submit"]
+DEFAULT_SPARK_BINARY = "spark-submit"
+ALLOWED_SPARK_BINARIES = [DEFAULT_SPARK_BINARY, "spark2-submit", "spark3-submit"]
class SparkSubmitHook(BaseHook, LoggingMixin):
@@ -41,7 +42,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
Wrap the spark-submit binary to kick off a spark-submit job; requires "spark-submit" binary in the PATH.
:param conf: Arbitrary Spark configuration properties
- :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured
+ :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured
in Airflow administration. When an invalid connection_id is supplied, it will default
to yarn.
:param files: Upload additional files to the executor running the job, separated by a
@@ -98,10 +99,44 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour."""
return {
- "hidden_fields": ["schema", "login", "password"],
+ "hidden_fields": ["schema", "login", "password", "extra"],
"relabeling": {},
}
+ @classmethod
+ def get_connection_form_widgets(cls) -> dict[str, Any]:
+ """Returns connection widgets to add to connection form."""
+ from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+ from flask_babel import lazy_gettext
+ from wtforms import StringField
+ from wtforms.validators import Optional, any_of
+
+ return {
+ "queue": StringField(
+ lazy_gettext("YARN queue"),
+ widget=BS3TextFieldWidget(),
+ description="Default YARN queue to use",
+ validators=[Optional()],
+ ),
+ "deploy-mode": StringField(
+ lazy_gettext("Deploy mode"),
+ widget=BS3TextFieldWidget(),
+ description="Must be client or cluster",
+ validators=[any_of(["client", "cluster"])],
+ default="client",
+ ),
+ "spark-binary": StringField(
+ lazy_gettext("Spark binary"),
+ widget=BS3TextFieldWidget(),
+ description=f"Must be one of: {', '.join(ALLOWED_SPARK_BINARIES)}",
+ validators=[any_of(ALLOWED_SPARK_BINARIES)],
+ default=DEFAULT_SPARK_BINARY,
+ ),
+ "namespace": StringField(
+ lazy_gettext("Kubernetes namespace"), widget=BS3TextFieldWidget(), validators=[Optional()]
+ ),
+ }
+
def __init__(
self,
conf: dict[str, Any] | None = None,
@@ -198,7 +233,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
"master": "yarn",
"queue": None,
"deploy_mode": None,
- "spark_binary": self.spark_binary or "spark-submit",
+ "spark_binary": self.spark_binary or DEFAULT_SPARK_BINARY,
"namespace": None,
}
@@ -216,7 +251,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
conn_data["queue"] = self._queue if self._queue else extra.get("queue")
conn_data["deploy_mode"] = self._deploy_mode if self._deploy_mode else extra.get("deploy-mode")
if not self.spark_binary:
- self.spark_binary = extra.get("spark-binary", "spark-submit")
+ self.spark_binary = extra.get("spark-binary", DEFAULT_SPARK_BINARY)
if self.spark_binary is not None and self.spark_binary not in ALLOWED_SPARK_BINARIES:
raise RuntimeError(
f"The spark-binary extra can be on of {ALLOWED_SPARK_BINARIES} and it"
diff --git a/airflow/providers/apache/spark/operators/spark_jdbc.py b/airflow/providers/apache/spark/operators/spark_jdbc.py
index 4b4dd648a6..e5ff5f9c65 100644
--- a/airflow/providers/apache/spark/operators/spark_jdbc.py
+++ b/airflow/providers/apache/spark/operators/spark_jdbc.py
@@ -37,7 +37,7 @@ class SparkJDBCOperator(SparkSubmitOperator):
:ref:`howto/operator:SparkJDBCOperator`
:param spark_app_name: Name of the job (default airflow-spark-jdbc)
- :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>`
+ :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>`
as configured in Airflow administration
:param spark_conf: Any additional Spark configuration properties
:param spark_py_files: Additional python files used (.zip, .egg, or .py)
diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py
index be2f2d0ac5..bd8480b815 100644
--- a/airflow/providers/apache/spark/operators/spark_submit.py
+++ b/airflow/providers/apache/spark/operators/spark_submit.py
@@ -37,7 +37,7 @@ class SparkSubmitOperator(BaseOperator):
:param application: The application that submitted as a job, either jar or py file. (templated)
:param conf: Arbitrary Spark configuration properties (templated)
- :param conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured
+ :param conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured
in Airflow administration. When an invalid connection_id is supplied, it will default to yarn.
:param files: Upload additional files to the executor running the job, separated by a
comma. Files will be placed in the working directory of each executor.
diff --git a/docs/apache-airflow-providers-apache-spark/connections/index.rst b/docs/apache-airflow-providers-apache-spark/connections/index.rst
new file mode 100644
index 0000000000..71716ec9d6
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-spark/connections/index.rst
@@ -0,0 +1,28 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+Apache Spark Connections
+========================
+
+
+.. toctree::
+ :maxdepth: 1
+ :glob:
+
+ *
diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst
similarity index 51%
copy from docs/apache-airflow-providers-apache-spark/connections/spark.rst
copy to docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst
index 05b92ce75c..aa5ef07157 100644
--- a/docs/apache-airflow-providers-apache-spark/connections/spark.rst
+++ b/docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst
@@ -17,34 +17,26 @@
-.. _howto/connection:spark:
+.. _howto/connection:spark-connect:
-Apache Spark Connection
-=======================
+Apache Spark Connect Connection
+===============================
-The Apache Spark connection type enables connection to Apache Spark.
+The Apache Spark Connect connection type enables connection to Apache Spark via the Spark connect interface.
Default Connection IDs
----------------------
-Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default. Spark SQL hooks and operators point to ``spark_sql_default`` by default. The Spark Connect hook uses ``spark_connect_default`` by default.
+The Spark Connect hook uses ``spark_connect_default`` by default.
Configuring the Connection
--------------------------
Host (required)
- The host to connect to, it can be ``local``, ``yarn`` or an URL.
+ The host to connect to, should be a valid hostname.
Port (optional)
Specify the port in case of host be an URL.
-Extra (optional)
- Specify the extra parameters (as json dictionary) that can be used in spark connection. The following parameters out of the standard python parameters are supported:
-
- * ``queue`` - The name of the YARN queue to which the application is submitted.
- * ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client).
- * ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value.
- * ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).
-
User ID (optional, only applies to Spark Connect)
The user ID to authenticate with the proxy.
@@ -54,18 +46,6 @@ Token (optional, only applies to Spark Connect)
Use SSL (optional, only applies to Spark Connect)
Whether to use SSL when connecting.
-When specifying the connection in environment variable you should specify
-it using URI syntax.
-
-Note that all components of the URI should be URL-encoded. The URI and the mongo
-connection string are not the same.
-
-For example:
-
-.. code-block:: bash
-
- export AIRFLOW_CONN_SPARK_DEFAULT='spark://mysparkcluster.com:80?deploy-mode=cluster&spark_binary=command&namespace=kube+namespace'
-
.. warning::
Make sure you trust your users with the ability to configure the host settings as it may enable the connection to
diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst
new file mode 100644
index 0000000000..c4e4c606de
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst
@@ -0,0 +1,48 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+.. _howto/connection:spark-sql:
+
+Apache Spark SQL Connection
+===========================
+
+The Apache Spark SQL connection type enables connection to Apache Spark via the ``spark-sql`` command.
+
+Default Connection IDs
+----------------------
+
+SparkSqlHook uses ``spark_sql_default`` by default.
+
+Configuring the Connection
+--------------------------
+Host (required)
+ The host to connect to, it can be ``local``, ``yarn`` or an URL.
+
+Port (optional)
+ Specify the port in case of host be an URL.
+
+YARN Queue
+ The name of the YARN queue to which the application is submitted.
+
+.. warning::
+
+ Make sure you trust your users with the ability to configure the host settings as it may enable the connection to
+ establish communication with external servers. It's crucial to understand that directing the connection towards a
+ malicious server can lead to significant security vulnerabilities, including the risk of encountering
+ Remote Code Execution (RCE) attacks.
diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst
similarity index 58%
rename from docs/apache-airflow-providers-apache-spark/connections/spark.rst
rename to docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst
index 05b92ce75c..f38a2908ba 100644
--- a/docs/apache-airflow-providers-apache-spark/connections/spark.rst
+++ b/docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst
@@ -17,17 +17,17 @@
-.. _howto/connection:spark:
+.. _howto/connection:spark-submit:
-Apache Spark Connection
-=======================
+Apache Spark Submit Connection
+==============================
-The Apache Spark connection type enables connection to Apache Spark.
+The Apache Spark Submit connection type enables connection to Apache Spark via the ``spark-submit`` command.
Default Connection IDs
----------------------
-Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default. Spark SQL hooks and operators point to ``spark_sql_default`` by default. The Spark Connect hook uses ``spark_connect_default`` by default.
+Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default.
Configuring the Connection
--------------------------
@@ -37,22 +37,17 @@ Host (required)
Port (optional)
Specify the port in case of host be an URL.
-Extra (optional)
- Specify the extra parameters (as json dictionary) that can be used in spark connection. The following parameters out of the standard python parameters are supported:
+YARN Queue (optional, only applies to spark on YARN applications)
+ The name of the YARN queue to which the application is submitted.
- * ``queue`` - The name of the YARN queue to which the application is submitted.
- * ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client).
- * ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value.
- * ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).
+Deploy mode (optional)
+ Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client).
-User ID (optional, only applies to Spark Connect)
- The user ID to authenticate with the proxy.
+Spark binary (optional)
+ The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value.
-Token (optional, only applies to Spark Connect)
- The token to authenticate with the proxy.
-
-Use SSL (optional, only applies to Spark Connect)
- Whether to use SSL when connecting.
+Kubernetes namespace (optional, only applies to spark on kubernetes applications)
+ Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).
When specifying the connection in environment variable you should specify
it using URI syntax.
diff --git a/docs/apache-airflow-providers-apache-spark/index.rst b/docs/apache-airflow-providers-apache-spark/index.rst
index 7bc8959b63..fa5698d61d 100644
--- a/docs/apache-airflow-providers-apache-spark/index.rst
+++ b/docs/apache-airflow-providers-apache-spark/index.rst
@@ -33,7 +33,7 @@
:maxdepth: 1
:caption: Guides
- Connection types <connections/spark>
+ Connection types <connections/index>
Decorators <decorators/pyspark>
Operators <operators>
diff --git a/docs/apache-airflow-providers-apache-spark/operators.rst b/docs/apache-airflow-providers-apache-spark/operators.rst
index 30d23f47cd..f6c20985f2 100644
--- a/docs/apache-airflow-providers-apache-spark/operators.rst
+++ b/docs/apache-airflow-providers-apache-spark/operators.rst
@@ -23,9 +23,9 @@ Prerequisite
------------
* To use :class:`~airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator`
- you must configure :doc:`Spark Connection <connections/spark>`.
+ you must configure :doc:`Spark Connection <connections/spark-submit>`.
* To use :class:`~airflow.providers.apache.spark.operators.spark_jdbc.SparkJDBCOperator`
- you must configure both :doc:`Spark Connection <connections/spark>`
+ you must configure both :doc:`Spark Connection <connections/spark-submit>`
and :doc:`JDBC connection <apache-airflow-providers-jdbc:connections/jdbc>`.
* :class:`~airflow.providers.apache.spark.operators.spark_sql.SparkSqlOperator`
gets all the configurations from operator parameters.