You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/01/06 11:07:00 UTC

[jira] [Commented] (AIRFLOW-6212) SparkSubmitHook failed to execute spark-submit to standalone cluster

    [ https://issues.apache.org/jira/browse/AIRFLOW-6212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008732#comment-17008732 ] 

ASF GitHub Bot commented on AIRFLOW-6212:
-----------------------------------------

albertusk95 commented on pull request #7075: [AIRFLOW-6212] SparkSubmitHook resolve connection
URL: https://github.com/apache/airflow/pull/7075
 
 
   **Problem**
   
   I tried to use `SparkSubmitOperator` using standalone cluster first. Unfortunately, the `spark-submit` task was failed. The following exception occurred.
   ```
   airflow.exceptions.AirflowException: Cannot execute: [path/to/spark-submit, '--master', host:port, job_file.py]
   ```
   
   The first thing that came up into my mind was why the master address excluded the `spark://` prefix. So it should be like `--master spark://host:port`. I performed a quick check to the source code and found that such a thing (scheme addition) hadn't been handled. Please take a look at the following code snippet [source](https://github.com/apache/airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py#L171).
   
   After reviewing the subsequent method callings, it turned out that the driver status tracking feature won't be utilised at all because of the above bug. Look at the following code snippet.
   
   ```python
   def _resolve_should_track_driver_status(self):
   	"""
   	Determines whether to not this hook should poll the spark driver status through subsequent spark-submit status requests after the initial spark-submit request
   	:return: if the driver status should be tracked
   	"""
   	return ('spark://' in self._connection['master'] and self._connection['deploy_mode'] == 'cluster')
   ```
   
   The above method will always return `False` as the spark master's address doesn't start with the scheme, such as `spark://`.
   
   Later on, I investigated the `Connection` module (_airflow.models.connection_) further and found that if we provide the URI (ex: _spark://host:port_), then the attributes of the `Connection` object will be derived via URI parsing.
   
   When parsing the host, the resulting value was only the hostname without the scheme. It also becomes a critical enough bug.
   
   **Proposed Solution**
   
   I think we don't really need the whole URI. I mean, when we store the connection data as an environment variable, we could just specify the URI parts in form of JSON. This approach is mainly used to tackle the URI parsing problem.
   
   In this case, the `conn_id` will still be preserved.
   
   Take a look at the following example (`conn_id` = "spark_default"). For simplicity, let's presume that `extra` is in JSON form.
   
   ```
   AIRFLOW_CONN_SPARK_DEFAULT='{"conn_type": <conn_type>, "host":<host>, "port":<port>, "schema":<schema>, "extra":<extra>}'
   ```
   
   Even though this solution could reduce the false result returned by URI parsing, one need to strictly ensure that each attribute (host, port, scheme, etc.) should store the relevant value. I think it's much easier than creating a correct URI parser. Moreover, applying such a technique makes the whole connection data builder for both database & environment variable mode have the same pattern (both use a structured data specification).
   
   ---
   Link to JIRA issue: https://issues.apache.org/jira/browse/AIRFLOW-6212
   
   - [X] Description above provides context of the change
   - [X] Commit message starts with `[AIRFLOW-NNNN]`, where AIRFLOW-NNNN = JIRA ID*
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   (*) For document-only changes, no JIRA issue is needed. Commit message starts `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> SparkSubmitHook failed to execute spark-submit to standalone cluster
> --------------------------------------------------------------------
>
>                 Key: AIRFLOW-6212
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6212
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: hooks, operators
>    Affects Versions: 1.10.6
>            Reporter: Albertus Kelvin
>            Assignee: xifeng
>            Priority: Trivial
>
> I was trying to submit a pyspark job with spark-submit using SparkSubmitOperator. I already set up the master appropriately via environment variable (AIRFLOW_CONN_SPARK_DEFAULT). The value was something like *spark://host:port*.
> However, an exception occurred: 
> {noformat}
> airflow.exceptions.AirflowException: Cannot execute: ['path/to/spark-submit', '--master', 'host:port', 'job.py']
> {noformat}
> Turns out that the master should have *spark://* preceding the host:port. I checked the code and found that this wasn't handled.
> {code:python}
> conn = self.get_connection(self._conn_id)
> if conn.port:
>          conn_data['master'] = "{}:{}".format(conn.host, conn.port)
> else:
>          conn_data['master'] = conn.host
> {code}
> I think the protocol should be added like the following.
> {code:python}
> conn_data['master'] = "spark://{}:{}".format(conn.host, conn.port)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)