You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/14 17:44:35 UTC

[GitHub] [airflow] jedcunningham opened a new pull request #15373: Require `name` with KubernetesPodOperator

jedcunningham opened a new pull request #15373:
URL: https://github.com/apache/airflow/pull/15373


   We will require a `name` when using KPO without `pod_template_file` or
   `full_pod_spec`. Previously it would use `name` literally
   without being randomized, which is likely to confuse users during
   runtime. This emits an exception during parsing instead.
   
   This also adds test coverage around `pod_template_file` and
   `full_pod_spec`.
   
   closes: #15326


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #15373: Require `name` with KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #15373:
URL: https://github.com/apache/airflow/pull/15373


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #15373: Require `name` with KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #15373:
URL: https://github.com/apache/airflow/pull/15373#discussion_r613488004



##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -234,6 +235,133 @@ def test_randomize_pod_name(self, mock_client, monitor_mock, start_mock):
         assert start_mock.call_args[0][0].metadata.name.startswith(name_base)
         assert start_mock.call_args[0][0].metadata.name != name_base
 
+    def test_pod_name_required(self):
+        with pytest.raises(AirflowException, match="`name` is required"):
+            KubernetesPodOperator(
+                namespace='default',
+                image="ubuntu:16.04",
+                cmds=["bash", "-cx"],
+                arguments=["echo 10"],
+                labels={"foo": "bar"},
+                task_id="task",
+                in_cluster=False,
+                do_xcom_push=False,
+                cluster_context='default',
+            )
+
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_full_pod_spec(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State
+
+        pod_spec = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name="hello", labels={"foo": "bar"}, namespace="mynamespace"),
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        image="ubuntu:16.04",
+                        command=["something"],
+                    )
+                ]
+            ),
+        )
+
+        k = KubernetesPodOperator(
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            full_pod_spec=pod_spec,
+        )
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = self.create_context(k)
+        k.execute(context=context)
+
+        assert start_mock.call_args[0][0].metadata.name == pod_spec.metadata.name
+        assert start_mock.call_args[0][0].metadata.labels == pod_spec.metadata.labels
+        assert start_mock.call_args[0][0].metadata.namespace == pod_spec.metadata.namespace
+        assert start_mock.call_args[0][0].spec.containers[0].image == pod_spec.spec.containers[0].image
+        assert start_mock.call_args[0][0].spec.containers[0].command == pod_spec.spec.containers[0].command
+
+        # kwargs take precedence, however
+        start_mock.reset_mock()
+        image = "some.custom.image:andtag"
+        name_base = "world"
+        k = KubernetesPodOperator(
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            full_pod_spec=pod_spec,
+            name=name_base,
+            image=image,
+        )
+        context = self.create_context(k)
+        k.execute(context=context)
+
+        assert start_mock.call_args[0][0].metadata.name.startswith(name_base)
+        assert start_mock.call_args[0][0].metadata.name != name_base
+        assert start_mock.call_args[0][0].spec.containers[0].image == image
+
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_pod_template_file(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State

Review comment:
       I'm planning a follow up cleanup PR with more, don't want to muddy the waters of this review, so I'm following the same pattern as the existing tests here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15373: Require `name` with KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15373:
URL: https://github.com/apache/airflow/pull/15373#discussion_r613485399



##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -234,6 +235,133 @@ def test_randomize_pod_name(self, mock_client, monitor_mock, start_mock):
         assert start_mock.call_args[0][0].metadata.name.startswith(name_base)
         assert start_mock.call_args[0][0].metadata.name != name_base
 
+    def test_pod_name_required(self):
+        with pytest.raises(AirflowException, match="`name` is required"):
+            KubernetesPodOperator(
+                namespace='default',
+                image="ubuntu:16.04",
+                cmds=["bash", "-cx"],
+                arguments=["echo 10"],
+                labels={"foo": "bar"},
+                task_id="task",
+                in_cluster=False,
+                do_xcom_push=False,
+                cluster_context='default',
+            )
+
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_full_pod_spec(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State
+
+        pod_spec = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name="hello", labels={"foo": "bar"}, namespace="mynamespace"),
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        image="ubuntu:16.04",
+                        command=["something"],
+                    )
+                ]
+            ),
+        )
+
+        k = KubernetesPodOperator(
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            full_pod_spec=pod_spec,
+        )
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = self.create_context(k)
+        k.execute(context=context)
+
+        assert start_mock.call_args[0][0].metadata.name == pod_spec.metadata.name
+        assert start_mock.call_args[0][0].metadata.labels == pod_spec.metadata.labels
+        assert start_mock.call_args[0][0].metadata.namespace == pod_spec.metadata.namespace
+        assert start_mock.call_args[0][0].spec.containers[0].image == pod_spec.spec.containers[0].image
+        assert start_mock.call_args[0][0].spec.containers[0].command == pod_spec.spec.containers[0].command
+
+        # kwargs take precedence, however
+        start_mock.reset_mock()
+        image = "some.custom.image:andtag"
+        name_base = "world"
+        k = KubernetesPodOperator(
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            full_pod_spec=pod_spec,
+            name=name_base,
+            image=image,
+        )
+        context = self.create_context(k)
+        k.execute(context=context)
+
+        assert start_mock.call_args[0][0].metadata.name.startswith(name_base)
+        assert start_mock.call_args[0][0].metadata.name != name_base
+        assert start_mock.call_args[0][0].spec.containers[0].image == image
+
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_pod_template_file(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State

Review comment:
       We should probably just move this at top level of this file -- I know for some reason we have imported this class in all tests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15373: Require `name` with KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15373:
URL: https://github.com/apache/airflow/pull/15373#discussion_r613490947



##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -234,6 +235,133 @@ def test_randomize_pod_name(self, mock_client, monitor_mock, start_mock):
         assert start_mock.call_args[0][0].metadata.name.startswith(name_base)
         assert start_mock.call_args[0][0].metadata.name != name_base
 
+    def test_pod_name_required(self):
+        with pytest.raises(AirflowException, match="`name` is required"):
+            KubernetesPodOperator(
+                namespace='default',
+                image="ubuntu:16.04",
+                cmds=["bash", "-cx"],
+                arguments=["echo 10"],
+                labels={"foo": "bar"},
+                task_id="task",
+                in_cluster=False,
+                do_xcom_push=False,
+                cluster_context='default',
+            )
+
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_full_pod_spec(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State
+
+        pod_spec = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name="hello", labels={"foo": "bar"}, namespace="mynamespace"),
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        image="ubuntu:16.04",
+                        command=["something"],
+                    )
+                ]
+            ),
+        )
+
+        k = KubernetesPodOperator(
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            full_pod_spec=pod_spec,
+        )
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = self.create_context(k)
+        k.execute(context=context)
+
+        assert start_mock.call_args[0][0].metadata.name == pod_spec.metadata.name
+        assert start_mock.call_args[0][0].metadata.labels == pod_spec.metadata.labels
+        assert start_mock.call_args[0][0].metadata.namespace == pod_spec.metadata.namespace
+        assert start_mock.call_args[0][0].spec.containers[0].image == pod_spec.spec.containers[0].image
+        assert start_mock.call_args[0][0].spec.containers[0].command == pod_spec.spec.containers[0].command
+
+        # kwargs take precedence, however
+        start_mock.reset_mock()
+        image = "some.custom.image:andtag"
+        name_base = "world"
+        k = KubernetesPodOperator(
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            full_pod_spec=pod_spec,
+            name=name_base,
+            image=image,
+        )
+        context = self.create_context(k)
+        k.execute(context=context)
+
+        assert start_mock.call_args[0][0].metadata.name.startswith(name_base)
+        assert start_mock.call_args[0][0].metadata.name != name_base
+        assert start_mock.call_args[0][0].spec.containers[0].image == image
+
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_pod_template_file(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State

Review comment:
       Sure, works for me




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15373: Require `name` with KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15373:
URL: https://github.com/apache/airflow/pull/15373#discussion_r613480581



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -415,8 +415,11 @@ def _try_numbers_match(context, pod) -> bool:
         return pod.metadata.labels['try_number'] == context['ti'].try_number
 
     def _set_name(self, name):
-        if name is None:
+        if name is None and (self.pod_template_file or self.full_pod_spec):
             return None
+        if name is None:
+            raise AirflowException("`name` is required unless `pod_template_file` or `full_pod_spec` is set")

Review comment:
       ```suggestion
           if name is None:
               if self.pod_template_file or self.full_pod_spec:
                   return None
               raise AirflowException("`name` is required unless `pod_template_file` or `full_pod_spec` is set")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15373: Require `name` with KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15373:
URL: https://github.com/apache/airflow/pull/15373#discussion_r613484770



##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -234,6 +235,133 @@ def test_randomize_pod_name(self, mock_client, monitor_mock, start_mock):
         assert start_mock.call_args[0][0].metadata.name.startswith(name_base)
         assert start_mock.call_args[0][0].metadata.name != name_base
 
+    def test_pod_name_required(self):
+        with pytest.raises(AirflowException, match="`name` is required"):
+            KubernetesPodOperator(
+                namespace='default',
+                image="ubuntu:16.04",
+                cmds=["bash", "-cx"],
+                arguments=["echo 10"],
+                labels={"foo": "bar"},
+                task_id="task",
+                in_cluster=False,
+                do_xcom_push=False,
+                cluster_context='default',
+            )
+
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_full_pod_spec(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State
+
+        pod_spec = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name="hello", labels={"foo": "bar"}, namespace="mynamespace"),
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        image="ubuntu:16.04",
+                        command=["something"],
+                    )
+                ]
+            ),
+        )
+
+        k = KubernetesPodOperator(
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            full_pod_spec=pod_spec,
+        )
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = self.create_context(k)
+        k.execute(context=context)
+
+        assert start_mock.call_args[0][0].metadata.name == pod_spec.metadata.name
+        assert start_mock.call_args[0][0].metadata.labels == pod_spec.metadata.labels
+        assert start_mock.call_args[0][0].metadata.namespace == pod_spec.metadata.namespace
+        assert start_mock.call_args[0][0].spec.containers[0].image == pod_spec.spec.containers[0].image
+        assert start_mock.call_args[0][0].spec.containers[0].command == pod_spec.spec.containers[0].command
+
+        # kwargs take precedence, however
+        start_mock.reset_mock()
+        image = "some.custom.image:andtag"
+        name_base = "world"
+        k = KubernetesPodOperator(
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            full_pod_spec=pod_spec,
+            name=name_base,
+            image=image,
+        )
+        context = self.create_context(k)
+        k.execute(context=context)
+
+        assert start_mock.call_args[0][0].metadata.name.startswith(name_base)
+        assert start_mock.call_args[0][0].metadata.name != name_base

Review comment:
       Can you add a single-line comment in the code to say why we assert this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15373: Require `name` with KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15373:
URL: https://github.com/apache/airflow/pull/15373#discussion_r613481041



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -415,8 +415,11 @@ def _try_numbers_match(context, pod) -> bool:
         return pod.metadata.labels['try_number'] == context['ti'].try_number
 
     def _set_name(self, name):
-        if name is None:
+        if name is None and (self.pod_template_file or self.full_pod_spec):
             return None
+        if name is None:
+            raise AirflowException("`name` is required unless `pod_template_file` or `full_pod_spec` is set")

Review comment:
       (just a nit)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15373: Require `name` with KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15373:
URL: https://github.com/apache/airflow/pull/15373#discussion_r613485399



##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -234,6 +235,133 @@ def test_randomize_pod_name(self, mock_client, monitor_mock, start_mock):
         assert start_mock.call_args[0][0].metadata.name.startswith(name_base)
         assert start_mock.call_args[0][0].metadata.name != name_base
 
+    def test_pod_name_required(self):
+        with pytest.raises(AirflowException, match="`name` is required"):
+            KubernetesPodOperator(
+                namespace='default',
+                image="ubuntu:16.04",
+                cmds=["bash", "-cx"],
+                arguments=["echo 10"],
+                labels={"foo": "bar"},
+                task_id="task",
+                in_cluster=False,
+                do_xcom_push=False,
+                cluster_context='default',
+            )
+
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_full_pod_spec(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State
+
+        pod_spec = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name="hello", labels={"foo": "bar"}, namespace="mynamespace"),
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        image="ubuntu:16.04",
+                        command=["something"],
+                    )
+                ]
+            ),
+        )
+
+        k = KubernetesPodOperator(
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            full_pod_spec=pod_spec,
+        )
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = self.create_context(k)
+        k.execute(context=context)
+
+        assert start_mock.call_args[0][0].metadata.name == pod_spec.metadata.name
+        assert start_mock.call_args[0][0].metadata.labels == pod_spec.metadata.labels
+        assert start_mock.call_args[0][0].metadata.namespace == pod_spec.metadata.namespace
+        assert start_mock.call_args[0][0].spec.containers[0].image == pod_spec.spec.containers[0].image
+        assert start_mock.call_args[0][0].spec.containers[0].command == pod_spec.spec.containers[0].command
+
+        # kwargs take precedence, however
+        start_mock.reset_mock()
+        image = "some.custom.image:andtag"
+        name_base = "world"
+        k = KubernetesPodOperator(
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            full_pod_spec=pod_spec,
+            name=name_base,
+            image=image,
+        )
+        context = self.create_context(k)
+        k.execute(context=context)
+
+        assert start_mock.call_args[0][0].metadata.name.startswith(name_base)
+        assert start_mock.call_args[0][0].metadata.name != name_base
+        assert start_mock.call_args[0][0].spec.containers[0].image == image
+
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_pod_template_file(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State

Review comment:
       We should probably just move this at top level of this file -- I know for some reason we have imported this class in all tests in this file




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org