You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/12/29 12:53:54 UTC

(airflow) branch main updated: change spark connection form and add spark connections docs (#36419)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ed9080ae6a change spark connection form and add spark connections docs (#36419)
ed9080ae6a is described below

commit ed9080ae6a17d7b2478652b676579f162462bb70
Author: shohamy7 <46...@users.noreply.github.com>
AuthorDate: Fri Dec 29 14:53:44 2023 +0200

    change spark connection form and add spark connections docs (#36419)
    
    * change spark connection form and add spark connections docs
    
    * make SQL letter upercase in spark-sql connection header
    
    * rename from spark to spark-submit and add default values in connection form
---
 airflow/providers/apache/spark/hooks/spark_jdbc.py |  2 +-
 airflow/providers/apache/spark/hooks/spark_sql.py  | 25 +++++++++++
 .../providers/apache/spark/hooks/spark_submit.py   | 45 +++++++++++++++++---
 .../providers/apache/spark/operators/spark_jdbc.py |  2 +-
 .../apache/spark/operators/spark_submit.py         |  2 +-
 .../connections/index.rst                          | 28 +++++++++++++
 .../connections/{spark.rst => spark-connect.rst}   | 32 +++------------
 .../connections/spark-sql.rst                      | 48 ++++++++++++++++++++++
 .../connections/{spark.rst => spark-submit.rst}    | 31 ++++++--------
 .../index.rst                                      |  2 +-
 .../operators.rst                                  |  4 +-
 11 files changed, 166 insertions(+), 55 deletions(-)

diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py b/airflow/providers/apache/spark/hooks/spark_jdbc.py
index 8c8d02f1ec..b904ca4260 100644
--- a/airflow/providers/apache/spark/hooks/spark_jdbc.py
+++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py
@@ -29,7 +29,7 @@ class SparkJDBCHook(SparkSubmitHook):
     Extends the SparkSubmitHook for performing data transfers to/from JDBC-based databases with Apache Spark.
 
     :param spark_app_name: Name of the job (default airflow-spark-jdbc)
-    :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>`
+    :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>`
         as configured in Airflow administration
     :param spark_conf: Any additional Spark configuration properties
     :param spark_py_files: Additional python files used (.zip, .egg, or .py)
diff --git a/airflow/providers/apache/spark/hooks/spark_sql.py b/airflow/providers/apache/spark/hooks/spark_sql.py
index 41dc741ccd..46eec49f30 100644
--- a/airflow/providers/apache/spark/hooks/spark_sql.py
+++ b/airflow/providers/apache/spark/hooks/spark_sql.py
@@ -54,6 +54,31 @@ class SparkSqlHook(BaseHook):
     conn_type = "spark_sql"
     hook_name = "Spark SQL"
 
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Return custom field behaviour."""
+        return {
+            "hidden_fields": ["schema", "login", "password", "extra"],
+            "relabeling": {},
+        }
+
+    @classmethod
+    def get_connection_form_widgets(cls) -> dict[str, Any]:
+        """Returns connection widgets to add to connection form."""
+        from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+        from flask_babel import lazy_gettext
+        from wtforms import StringField
+        from wtforms.validators import Optional
+
+        return {
+            "queue": StringField(
+                lazy_gettext("YARN queue"),
+                widget=BS3TextFieldWidget(),
+                description="Default YARN queue to use",
+                validators=[Optional()],
+            )
+        }
+
     def __init__(
         self,
         sql: str,
diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py
index b495cafeb6..b96d992bba 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -33,7 +33,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 with contextlib.suppress(ImportError, NameError):
     from airflow.providers.cncf.kubernetes import kube_client
 
-ALLOWED_SPARK_BINARIES = ["spark-submit", "spark2-submit", "spark3-submit"]
+DEFAULT_SPARK_BINARY = "spark-submit"
+ALLOWED_SPARK_BINARIES = [DEFAULT_SPARK_BINARY, "spark2-submit", "spark3-submit"]
 
 
 class SparkSubmitHook(BaseHook, LoggingMixin):
@@ -41,7 +42,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
     Wrap the spark-submit binary to kick off a spark-submit job; requires "spark-submit" binary in the PATH.
 
     :param conf: Arbitrary Spark configuration properties
-    :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured
+    :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured
         in Airflow administration. When an invalid connection_id is supplied, it will default
         to yarn.
     :param files: Upload additional files to the executor running the job, separated by a
@@ -98,10 +99,44 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
     def get_ui_field_behaviour(cls) -> dict[str, Any]:
         """Return custom field behaviour."""
         return {
-            "hidden_fields": ["schema", "login", "password"],
+            "hidden_fields": ["schema", "login", "password", "extra"],
             "relabeling": {},
         }
 
+    @classmethod
+    def get_connection_form_widgets(cls) -> dict[str, Any]:
+        """Returns connection widgets to add to connection form."""
+        from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+        from flask_babel import lazy_gettext
+        from wtforms import StringField
+        from wtforms.validators import Optional, any_of
+
+        return {
+            "queue": StringField(
+                lazy_gettext("YARN queue"),
+                widget=BS3TextFieldWidget(),
+                description="Default YARN queue to use",
+                validators=[Optional()],
+            ),
+            "deploy-mode": StringField(
+                lazy_gettext("Deploy mode"),
+                widget=BS3TextFieldWidget(),
+                description="Must be client or cluster",
+                validators=[any_of(["client", "cluster"])],
+                default="client",
+            ),
+            "spark-binary": StringField(
+                lazy_gettext("Spark binary"),
+                widget=BS3TextFieldWidget(),
+                description=f"Must be one of: {', '.join(ALLOWED_SPARK_BINARIES)}",
+                validators=[any_of(ALLOWED_SPARK_BINARIES)],
+                default=DEFAULT_SPARK_BINARY,
+            ),
+            "namespace": StringField(
+                lazy_gettext("Kubernetes namespace"), widget=BS3TextFieldWidget(), validators=[Optional()]
+            ),
+        }
+
     def __init__(
         self,
         conf: dict[str, Any] | None = None,
@@ -198,7 +233,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             "master": "yarn",
             "queue": None,
             "deploy_mode": None,
-            "spark_binary": self.spark_binary or "spark-submit",
+            "spark_binary": self.spark_binary or DEFAULT_SPARK_BINARY,
             "namespace": None,
         }
 
@@ -216,7 +251,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             conn_data["queue"] = self._queue if self._queue else extra.get("queue")
             conn_data["deploy_mode"] = self._deploy_mode if self._deploy_mode else extra.get("deploy-mode")
             if not self.spark_binary:
-                self.spark_binary = extra.get("spark-binary", "spark-submit")
+                self.spark_binary = extra.get("spark-binary", DEFAULT_SPARK_BINARY)
                 if self.spark_binary is not None and self.spark_binary not in ALLOWED_SPARK_BINARIES:
                     raise RuntimeError(
                         f"The spark-binary extra can be on of {ALLOWED_SPARK_BINARIES} and it"
diff --git a/airflow/providers/apache/spark/operators/spark_jdbc.py b/airflow/providers/apache/spark/operators/spark_jdbc.py
index 4b4dd648a6..e5ff5f9c65 100644
--- a/airflow/providers/apache/spark/operators/spark_jdbc.py
+++ b/airflow/providers/apache/spark/operators/spark_jdbc.py
@@ -37,7 +37,7 @@ class SparkJDBCOperator(SparkSubmitOperator):
         :ref:`howto/operator:SparkJDBCOperator`
 
     :param spark_app_name: Name of the job (default airflow-spark-jdbc)
-    :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>`
+    :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>`
         as configured in Airflow administration
     :param spark_conf: Any additional Spark configuration properties
     :param spark_py_files: Additional python files used (.zip, .egg, or .py)
diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py
index be2f2d0ac5..bd8480b815 100644
--- a/airflow/providers/apache/spark/operators/spark_submit.py
+++ b/airflow/providers/apache/spark/operators/spark_submit.py
@@ -37,7 +37,7 @@ class SparkSubmitOperator(BaseOperator):
 
     :param application: The application that submitted as a job, either jar or py file. (templated)
     :param conf: Arbitrary Spark configuration properties (templated)
-    :param conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured
+    :param conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured
         in Airflow administration. When an invalid connection_id is supplied, it will default to yarn.
     :param files: Upload additional files to the executor running the job, separated by a
                   comma. Files will be placed in the working directory of each executor.
diff --git a/docs/apache-airflow-providers-apache-spark/connections/index.rst b/docs/apache-airflow-providers-apache-spark/connections/index.rst
new file mode 100644
index 0000000000..71716ec9d6
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-spark/connections/index.rst
@@ -0,0 +1,28 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Apache Spark Connections
+========================
+
+
+.. toctree::
+    :maxdepth: 1
+    :glob:
+
+    *
diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst
similarity index 51%
copy from docs/apache-airflow-providers-apache-spark/connections/spark.rst
copy to docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst
index 05b92ce75c..aa5ef07157 100644
--- a/docs/apache-airflow-providers-apache-spark/connections/spark.rst
+++ b/docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst
@@ -17,34 +17,26 @@
 
 
 
-.. _howto/connection:spark:
+.. _howto/connection:spark-connect:
 
-Apache Spark Connection
-=======================
+Apache Spark Connect Connection
+===============================
 
-The Apache Spark connection type enables connection to Apache Spark.
+The Apache Spark Connect connection type enables connection to Apache Spark via the Spark connect interface.
 
 Default Connection IDs
 ----------------------
 
-Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default. Spark SQL hooks and operators point to ``spark_sql_default`` by default. The Spark Connect hook uses ``spark_connect_default`` by default.
+The Spark Connect hook uses ``spark_connect_default`` by default.
 
 Configuring the Connection
 --------------------------
 Host (required)
-    The host to connect to, it can be ``local``, ``yarn`` or an URL.
+    The host to connect to, should be a valid hostname.
 
 Port (optional)
     Specify the port in case of host be an URL.
 
-Extra (optional)
-    Specify the extra parameters (as json dictionary) that can be used in spark connection. The following parameters out of the standard python parameters are supported:
-
-    * ``queue`` - The name of the YARN queue to which the application is submitted.
-    * ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client).
-    * ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value.
-    * ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).
-
 User ID (optional, only applies to Spark Connect)
     The user ID to authenticate with the proxy.
 
@@ -54,18 +46,6 @@ Token (optional, only applies to Spark Connect)
 Use SSL (optional, only applies to Spark Connect)
     Whether to use SSL when connecting.
 
-When specifying the connection in environment variable you should specify
-it using URI syntax.
-
-Note that all components of the URI should be URL-encoded. The URI and the mongo
-connection string are not the same.
-
-For example:
-
-.. code-block:: bash
-
-   export AIRFLOW_CONN_SPARK_DEFAULT='spark://mysparkcluster.com:80?deploy-mode=cluster&spark_binary=command&namespace=kube+namespace'
-
 .. warning::
 
   Make sure you trust your users with the ability to configure the host settings as it may enable the connection to
diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst
new file mode 100644
index 0000000000..c4e4c606de
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst
@@ -0,0 +1,48 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+.. _howto/connection:spark-sql:
+
+Apache Spark SQL Connection
+===========================
+
+The Apache Spark SQL connection type enables connection to Apache Spark via the ``spark-sql`` command.
+
+Default Connection IDs
+----------------------
+
+SparkSqlHook uses ``spark_sql_default`` by default.
+
+Configuring the Connection
+--------------------------
+Host (required)
+    The host to connect to, it can be ``local``, ``yarn`` or an URL.
+
+Port (optional)
+    Specify the port in case of host be an URL.
+
+YARN Queue
+    The name of the YARN queue to which the application is submitted.
+
+.. warning::
+
+  Make sure you trust your users with the ability to configure the host settings as it may enable the connection to
+  establish communication with external servers. It's crucial to understand that directing the connection towards a
+  malicious server can lead to significant security vulnerabilities, including the risk of encountering
+  Remote Code Execution (RCE) attacks.
diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst
similarity index 58%
rename from docs/apache-airflow-providers-apache-spark/connections/spark.rst
rename to docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst
index 05b92ce75c..f38a2908ba 100644
--- a/docs/apache-airflow-providers-apache-spark/connections/spark.rst
+++ b/docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst
@@ -17,17 +17,17 @@
 
 
 
-.. _howto/connection:spark:
+.. _howto/connection:spark-submit:
 
-Apache Spark Connection
-=======================
+Apache Spark Submit Connection
+==============================
 
-The Apache Spark connection type enables connection to Apache Spark.
+The Apache Spark Submit connection type enables connection to Apache Spark via the ``spark-submit`` command.
 
 Default Connection IDs
 ----------------------
 
-Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default. Spark SQL hooks and operators point to ``spark_sql_default`` by default. The Spark Connect hook uses ``spark_connect_default`` by default.
+Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default.
 
 Configuring the Connection
 --------------------------
@@ -37,22 +37,17 @@ Host (required)
 Port (optional)
     Specify the port in case of host be an URL.
 
-Extra (optional)
-    Specify the extra parameters (as json dictionary) that can be used in spark connection. The following parameters out of the standard python parameters are supported:
+YARN Queue (optional, only applies to spark on YARN applications)
+    The name of the YARN queue to which the application is submitted.
 
-    * ``queue`` - The name of the YARN queue to which the application is submitted.
-    * ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client).
-    * ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value.
-    * ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).
+Deploy mode (optional)
+    Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client).
 
-User ID (optional, only applies to Spark Connect)
-    The user ID to authenticate with the proxy.
+Spark binary (optional)
+    The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value.
 
-Token (optional, only applies to Spark Connect)
-    The token to authenticate with the proxy.
-
-Use SSL (optional, only applies to Spark Connect)
-    Whether to use SSL when connecting.
+Kubernetes namespace (optional, only applies to spark on kubernetes applications)
+    Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).
 
 When specifying the connection in environment variable you should specify
 it using URI syntax.
diff --git a/docs/apache-airflow-providers-apache-spark/index.rst b/docs/apache-airflow-providers-apache-spark/index.rst
index 7bc8959b63..fa5698d61d 100644
--- a/docs/apache-airflow-providers-apache-spark/index.rst
+++ b/docs/apache-airflow-providers-apache-spark/index.rst
@@ -33,7 +33,7 @@
     :maxdepth: 1
     :caption: Guides
 
-    Connection types <connections/spark>
+    Connection types <connections/index>
     Decorators <decorators/pyspark>
     Operators <operators>
 
diff --git a/docs/apache-airflow-providers-apache-spark/operators.rst b/docs/apache-airflow-providers-apache-spark/operators.rst
index 30d23f47cd..f6c20985f2 100644
--- a/docs/apache-airflow-providers-apache-spark/operators.rst
+++ b/docs/apache-airflow-providers-apache-spark/operators.rst
@@ -23,9 +23,9 @@ Prerequisite
 ------------
 
 * To use :class:`~airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator`
-  you must configure :doc:`Spark Connection <connections/spark>`.
+  you must configure :doc:`Spark Connection <connections/spark-submit>`.
 * To use :class:`~airflow.providers.apache.spark.operators.spark_jdbc.SparkJDBCOperator`
-  you must configure both :doc:`Spark Connection <connections/spark>`
+  you must configure both :doc:`Spark Connection <connections/spark-submit>`
   and :doc:`JDBC connection <apache-airflow-providers-jdbc:connections/jdbc>`.
 * :class:`~airflow.providers.apache.spark.operators.spark_sql.SparkSqlOperator`
   gets all the configurations from operator parameters.