You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/05 01:55:13 UTC

[GitHub] [airflow] kaxil opened a new pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor

kaxil opened a new pull request #15197:
URL: https://github.com/apache/airflow/pull/15197


   This feature was added in https://github.com/apache/airflow/pull/11784 but
   it was broken as it got `pod_template_override` from `executor_config`
   instead of `pod_template_file`.
   
   closes https://github.com/apache/airflow/pull/14199
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r607065015



##########
File path: airflow/kubernetes_executor_templates/basic_template.yaml
##########
@@ -69,8 +69,8 @@ spec:
         defaultMode: 420
   restartPolicy: Never
   terminationGracePeriodSeconds: 30
-  serviceAccountName: airflow-worker-serviceaccount
-  serviceAccount: airflow-worker-serviceaccount
+  serviceAccountName: airflow-worker
+  serviceAccount: airflow-worker

Review comment:
       This change was needed after https://github.com/apache/airflow/commit/23768f694697be1b29ec00b8deeed5777d2538e8
   
   However since we were not using the `pod_template_file` in the test we didn't find that it was broken




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r606884576



##########
File path: docs/apache-airflow/executor/kubernetes.rst
##########
@@ -125,7 +125,7 @@ name ``base`` and a second container containing your desired sidecar.
     :end-before: [END task_with_sidecar]
 
 You can also create custom ``pod_template_file`` on a per-task basis so that you can recycle the same base values between multiple tasks.
-This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override_spec``.

Review comment:
       ``pod_override_spec`` never existed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r607065015



##########
File path: airflow/kubernetes_executor_templates/basic_template.yaml
##########
@@ -69,8 +69,8 @@ spec:
         defaultMode: 420
   restartPolicy: Never
   terminationGracePeriodSeconds: 30
-  serviceAccountName: airflow-worker-serviceaccount
-  serviceAccount: airflow-worker-serviceaccount
+  serviceAccountName: airflow-worker
+  serviceAccount: airflow-worker

Review comment:
       This change was needed after Helm chart change: https://github.com/apache/airflow/commit/23768f694697be1b29ec00b8deeed5777d2538e8
   
   However since we were not using the `pod_template_file` in the test we didn't find that it was broken




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r607088711



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job
 
         assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
 
+    @pytest.mark.execution_timeout(10)
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+        current_folder = pathlib.Path(__file__).parent.absolute()
+        template_file = str(
+            (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+        )
+
+        mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+        mock_get_kube_client.return_value = mock_kube_client
+
+        with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+            executor = self.kubernetes_executor
+            executor.start()
+
+            assert executor.event_buffer == {}
+            assert executor.task_queue.empty()
+
+            executor.execute_async(
+                key=('dag', 'task', datetime.utcnow(), 1),
+                queue=None,
+                command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+                executor_config={
+                    "pod_template_file": template_file,
+                    "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+                },
+            )
+
+            assert not executor.task_queue.empty()
+            task = executor.task_queue.get_nowait()
+            _, _, expected_executor_config, expected_pod_template_file = task
+
+            # Test that the correct values have been put to queue
+            assert expected_executor_config.metadata.labels == {'release': 'stable'}
+            assert expected_pod_template_file == template_file
+
+            self.kubernetes_executor.kube_scheduler.run_next(task)
+            mock_run_pod_async.assert_called_once_with(
+                k8s.V1Pod(
+                    api_version="v1",
+                    kind="Pod",
+                    metadata=k8s.V1ObjectMeta(
+                        name=mock.ANY,
+                        namespace="default",
+                        annotations={
+                            'dag_id': 'dag',
+                            'execution_date': mock.ANY,
+                            'task_id': 'task',
+                            'try_number': '1',
+                        },
+                        labels={
+                            'airflow-worker': '5',
+                            'airflow_version': mock.ANY,
+                            'dag_id': 'dag',
+                            'execution_date': mock.ANY,
+                            'kubernetes_executor': 'True',
+                            'mylabel': 'foo',
+                            'release': 'stable',
+                            'task_id': 'task',
+                            'try_number': '1',
+                        },
+                    ),
+                    spec=k8s.V1PodSpec(
+                        containers=mock.ANY,

Review comment:
       I was in two minds with that --  I added it back in https://github.com/apache/airflow/pull/15197/commits/855d401ccadfd96c2798ecdf458bd9cb82f44e50

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job
 
         assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
 
+    @pytest.mark.execution_timeout(10)
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+        current_folder = pathlib.Path(__file__).parent.absolute()
+        template_file = str(
+            (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+        )
+
+        mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+        mock_get_kube_client.return_value = mock_kube_client
+
+        with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+            executor = self.kubernetes_executor
+            executor.start()
+
+            assert executor.event_buffer == {}
+            assert executor.task_queue.empty()
+
+            executor.execute_async(
+                key=('dag', 'task', datetime.utcnow(), 1),

Review comment:
       Done in https://github.com/apache/airflow/pull/15197/commits/855d401ccadfd96c2798ecdf458bd9cb82f44e50




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #15197:
URL: https://github.com/apache/airflow/pull/15197


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #15197: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r607011484



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job
 
         assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
 
+    @pytest.mark.execution_timeout(10)
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+        current_folder = pathlib.Path(__file__).parent.absolute()
+        template_file = str(
+            (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+        )
+
+        mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+        mock_get_kube_client.return_value = mock_kube_client
+
+        with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+            executor = self.kubernetes_executor
+            executor.start()
+
+            assert executor.event_buffer == {}
+            assert executor.task_queue.empty()
+
+            executor.execute_async(
+                key=('dag', 'task', datetime.utcnow(), 1),
+                queue=None,
+                command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+                executor_config={
+                    "pod_template_file": template_file,
+                    "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+                },
+            )
+
+            assert not executor.task_queue.empty()
+            task = executor.task_queue.get_nowait()
+            _, _, expected_executor_config, expected_pod_template_file = task
+
+            # Test that the correct values have been put to queue
+            assert expected_executor_config.metadata.labels == {'release': 'stable'}
+            assert expected_pod_template_file == template_file
+
+            self.kubernetes_executor.kube_scheduler.run_next(task)
+            mock_run_pod_async.assert_called_once_with(
+                k8s.V1Pod(
+                    api_version="v1",
+                    kind="Pod",
+                    metadata=k8s.V1ObjectMeta(
+                        name=mock.ANY,
+                        namespace="default",
+                        annotations={
+                            'dag_id': 'dag',
+                            'execution_date': mock.ANY,
+                            'task_id': 'task',
+                            'try_number': '1',
+                        },
+                        labels={
+                            'airflow-worker': '5',
+                            'airflow_version': mock.ANY,
+                            'dag_id': 'dag',
+                            'execution_date': mock.ANY,
+                            'kubernetes_executor': 'True',
+                            'mylabel': 'foo',
+                            'release': 'stable',
+                            'task_id': 'task',
+                            'try_number': '1',
+                        },
+                    ),
+                    spec=k8s.V1PodSpec(
+                        containers=mock.ANY,

Review comment:
       Possible to make the assertion more granular? Container spec is worth testing carefully, e.g. the image used.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job
 
         assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
 
+    @pytest.mark.execution_timeout(10)
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+        current_folder = pathlib.Path(__file__).parent.absolute()
+        template_file = str(
+            (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+        )
+
+        mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+        mock_get_kube_client.return_value = mock_kube_client
+
+        with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+            executor = self.kubernetes_executor
+            executor.start()
+
+            assert executor.event_buffer == {}
+            assert executor.task_queue.empty()
+
+            executor.execute_async(
+                key=('dag', 'task', datetime.utcnow(), 1),

Review comment:
       May be redundant, but to be on more safe side, we may want to assert the execution date here as well, rather than just having `mock.ANY`.
   
   We have have something like below here
   
   ```python
               execution_date = datetime.utcnow()
               executor.execute_async(
                   key=('dag', 'task', execution_date, 1),
                   ... ...
   
   ```
   
   Then in `mock_run_pod_async.assert_called_once_with()`, have
   ```python
   annotations={
       'dag_id': 'dag',
       'execution_date': execution_date.isoformat(),
       ... ...
   }
   ```
   
   and 
   
   ```python
   labels={
       'airflow-worker': '5',
       'airflow_version': mock.ANY,
       'dag_id': 'dag',
       'execution_date': pod_generator.datetime_to_label_safe_datestring(execution_date),
       ... ...
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org