You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "xifeng (Jira)" <ji...@apache.org> on 2019/12/11 09:09:00 UTC
[jira] [Commented] (AIRFLOW-6214) Spark driver status tracking for
standalone, YARN, Mesos and K8s with cluster deploy mode
[ https://issues.apache.org/jira/browse/AIRFLOW-6214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16993330#comment-16993330 ]
xifeng commented on AIRFLOW-6214:
---------------------------------
I think it is supposed to pass the `spark://` in `host` param. As I see there are some demo in `tests\contrib\hooks\test_spark_submit_hook.py`.
```
db.merge_conn(
Connection(
conn_id='spark_standalone_cluster_client_mode', conn_type='spark',
host='spark://spark-standalone-master:6066',
extra='\{"spark-home": "/path/to/spark_home", "deploy-mode": "client"}')
)
```
> Spark driver status tracking for standalone, YARN, Mesos and K8s with cluster deploy mode
> -----------------------------------------------------------------------------------------
>
> Key: AIRFLOW-6214
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6214
> Project: Apache Airflow
> Issue Type: Improvement
> Components: hooks, operators
> Affects Versions: 1.10.6
> Reporter: Albertus Kelvin
> Assignee: xifeng
> Priority: Minor
>
> Based on the following code snippet:
> {code:python}
> def _resolve_should_track_driver_status(self):
> return ('spark://' in self._connection['master'] and
> self._connection['deploy_mode'] == 'cluster')
> {code}
>
> It seems that the above code will always return *False* because the master address for standalone cluster doesn't contain *spark://* as shown from the below code snippet.
> {code:python}
> conn = self.get_connection(self._conn_id)
> if conn.port:
> conn_data['master'] = "{}:{}".format(conn.host, conn.port)
> else:
> conn_data['master'] = conn.host
> {code}
> Additionally, I think this driver status tracker should also be enabled for mesos and kubernetes with cluster mode since the *--status* argument supports all of these cluster managers. Refer to [this|https://github.com/apache/spark/blob/be867e8a9ee8fc5e4831521770f51793e9265550/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala#L543].
> For YARN cluster mode, I think we can use built-in commands from yarn itself, such as *yarn application -status <ApplicationID>*.
> Therefore, the *_build_track_driver_status_command* method should be updated accordingly to accommodate such a need, such as the following.
> {code:python}
> def _build_track_driver_status_command(self):
> # The driver id so we can poll for its status
> if not self._driver_id:
> raise AirflowException(
> "Invalid status: attempted to poll driver " +
> "status but no driver id is known. Giving up.")
> if self._connection['master'].startswith("spark://") or
> self._connection['master'].startswith("mesos://") or
> self._connection['master'].startswith("k8s://"):
> # standalone, mesos, kubernetes
> connection_cmd = self._get_spark_binary_path()
> connection_cmd += ["--master", self._connection['master']]
> connection_cmd += ["--status", self._driver_id]
> else:
> # yarn
> connection_cmd = ["yarn application -status"]
> connection_cmd += [self._driver_id]
> self.log.debug("Poll driver status cmd: %s", connection_cmd)
> return connection_cmd
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)