You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/05 01:55:13 UTC
[GitHub] [airflow] kaxil opened a new pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor
kaxil opened a new pull request #15197:
URL: https://github.com/apache/airflow/pull/15197
This feature was added in https://github.com/apache/airflow/pull/11784 but
it was broken as it got `pod_template_override` from `executor_config`
instead of `pod_template_file`.
closes https://github.com/apache/airflow/pull/14199
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r607065015
##########
File path: airflow/kubernetes_executor_templates/basic_template.yaml
##########
@@ -69,8 +69,8 @@ spec:
defaultMode: 420
restartPolicy: Never
terminationGracePeriodSeconds: 30
- serviceAccountName: airflow-worker-serviceaccount
- serviceAccount: airflow-worker-serviceaccount
+ serviceAccountName: airflow-worker
+ serviceAccount: airflow-worker
Review comment:
This change was needed after https://github.com/apache/airflow/commit/23768f694697be1b29ec00b8deeed5777d2538e8
However since we were not using the `pod_template_file` in the test we didn't find that it was broken
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r606884576
##########
File path: docs/apache-airflow/executor/kubernetes.rst
##########
@@ -125,7 +125,7 @@ name ``base`` and a second container containing your desired sidecar.
:end-before: [END task_with_sidecar]
You can also create custom ``pod_template_file`` on a per-task basis so that you can recycle the same base values between multiple tasks.
-This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override_spec``.
Review comment:
``pod_override_spec`` never existed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r607065015
##########
File path: airflow/kubernetes_executor_templates/basic_template.yaml
##########
@@ -69,8 +69,8 @@ spec:
defaultMode: 420
restartPolicy: Never
terminationGracePeriodSeconds: 30
- serviceAccountName: airflow-worker-serviceaccount
- serviceAccount: airflow-worker-serviceaccount
+ serviceAccountName: airflow-worker
+ serviceAccount: airflow-worker
Review comment:
This change was needed after Helm chart change: https://github.com/apache/airflow/commit/23768f694697be1b29ec00b8deeed5777d2538e8
However since we were not using the `pod_template_file` in the test we didn't find that it was broken
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r607088711
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job
assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
+ @pytest.mark.execution_timeout(10)
+ @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+ @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+ current_folder = pathlib.Path(__file__).parent.absolute()
+ template_file = str(
+ (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+ )
+
+ mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+ mock_get_kube_client.return_value = mock_kube_client
+
+ with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+ executor = self.kubernetes_executor
+ executor.start()
+
+ assert executor.event_buffer == {}
+ assert executor.task_queue.empty()
+
+ executor.execute_async(
+ key=('dag', 'task', datetime.utcnow(), 1),
+ queue=None,
+ command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+ executor_config={
+ "pod_template_file": template_file,
+ "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+ },
+ )
+
+ assert not executor.task_queue.empty()
+ task = executor.task_queue.get_nowait()
+ _, _, expected_executor_config, expected_pod_template_file = task
+
+ # Test that the correct values have been put to queue
+ assert expected_executor_config.metadata.labels == {'release': 'stable'}
+ assert expected_pod_template_file == template_file
+
+ self.kubernetes_executor.kube_scheduler.run_next(task)
+ mock_run_pod_async.assert_called_once_with(
+ k8s.V1Pod(
+ api_version="v1",
+ kind="Pod",
+ metadata=k8s.V1ObjectMeta(
+ name=mock.ANY,
+ namespace="default",
+ annotations={
+ 'dag_id': 'dag',
+ 'execution_date': mock.ANY,
+ 'task_id': 'task',
+ 'try_number': '1',
+ },
+ labels={
+ 'airflow-worker': '5',
+ 'airflow_version': mock.ANY,
+ 'dag_id': 'dag',
+ 'execution_date': mock.ANY,
+ 'kubernetes_executor': 'True',
+ 'mylabel': 'foo',
+ 'release': 'stable',
+ 'task_id': 'task',
+ 'try_number': '1',
+ },
+ ),
+ spec=k8s.V1PodSpec(
+ containers=mock.ANY,
Review comment:
I was in two minds with that -- I added it back in https://github.com/apache/airflow/pull/15197/commits/855d401ccadfd96c2798ecdf458bd9cb82f44e50
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job
assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
+ @pytest.mark.execution_timeout(10)
+ @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+ @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+ current_folder = pathlib.Path(__file__).parent.absolute()
+ template_file = str(
+ (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+ )
+
+ mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+ mock_get_kube_client.return_value = mock_kube_client
+
+ with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+ executor = self.kubernetes_executor
+ executor.start()
+
+ assert executor.event_buffer == {}
+ assert executor.task_queue.empty()
+
+ executor.execute_async(
+ key=('dag', 'task', datetime.utcnow(), 1),
Review comment:
Done in https://github.com/apache/airflow/pull/15197/commits/855d401ccadfd96c2798ecdf458bd9cb82f44e50
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil merged pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor
Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #15197:
URL: https://github.com/apache/airflow/pull/15197
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor
Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r607011484
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job
assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
+ @pytest.mark.execution_timeout(10)
+ @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+ @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+ current_folder = pathlib.Path(__file__).parent.absolute()
+ template_file = str(
+ (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+ )
+
+ mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+ mock_get_kube_client.return_value = mock_kube_client
+
+ with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+ executor = self.kubernetes_executor
+ executor.start()
+
+ assert executor.event_buffer == {}
+ assert executor.task_queue.empty()
+
+ executor.execute_async(
+ key=('dag', 'task', datetime.utcnow(), 1),
+ queue=None,
+ command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+ executor_config={
+ "pod_template_file": template_file,
+ "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+ },
+ )
+
+ assert not executor.task_queue.empty()
+ task = executor.task_queue.get_nowait()
+ _, _, expected_executor_config, expected_pod_template_file = task
+
+ # Test that the correct values have been put to queue
+ assert expected_executor_config.metadata.labels == {'release': 'stable'}
+ assert expected_pod_template_file == template_file
+
+ self.kubernetes_executor.kube_scheduler.run_next(task)
+ mock_run_pod_async.assert_called_once_with(
+ k8s.V1Pod(
+ api_version="v1",
+ kind="Pod",
+ metadata=k8s.V1ObjectMeta(
+ name=mock.ANY,
+ namespace="default",
+ annotations={
+ 'dag_id': 'dag',
+ 'execution_date': mock.ANY,
+ 'task_id': 'task',
+ 'try_number': '1',
+ },
+ labels={
+ 'airflow-worker': '5',
+ 'airflow_version': mock.ANY,
+ 'dag_id': 'dag',
+ 'execution_date': mock.ANY,
+ 'kubernetes_executor': 'True',
+ 'mylabel': 'foo',
+ 'release': 'stable',
+ 'task_id': 'task',
+ 'try_number': '1',
+ },
+ ),
+ spec=k8s.V1PodSpec(
+ containers=mock.ANY,
Review comment:
Possible to make the assertion more granular? Container spec is worth testing carefully, e.g. the image used.
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job
assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
+ @pytest.mark.execution_timeout(10)
+ @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+ @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+ current_folder = pathlib.Path(__file__).parent.absolute()
+ template_file = str(
+ (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+ )
+
+ mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+ mock_get_kube_client.return_value = mock_kube_client
+
+ with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+ executor = self.kubernetes_executor
+ executor.start()
+
+ assert executor.event_buffer == {}
+ assert executor.task_queue.empty()
+
+ executor.execute_async(
+ key=('dag', 'task', datetime.utcnow(), 1),
Review comment:
May be redundant, but to be on more safe side, we may want to assert the execution date here as well, rather than just having `mock.ANY`.
We have have something like below here
```python
execution_date = datetime.utcnow()
executor.execute_async(
key=('dag', 'task', execution_date, 1),
... ...
```
Then in `mock_run_pod_async.assert_called_once_with()`, have
```python
annotations={
'dag_id': 'dag',
'execution_date': execution_date.isoformat(),
... ...
}
```
and
```python
labels={
'airflow-worker': '5',
'airflow_version': mock.ANY,
'dag_id': 'dag',
'execution_date': pod_generator.datetime_to_label_safe_datestring(execution_date),
... ...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org