You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/09 20:43:14 UTC

[airflow] branch v2-0-test updated (57a8afd -> 7d192de)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 57a8afd  Fixed #14270: Add error message in OOM situations (#15207)
     new 50a1666  Fix celery executor bug trying to call len on map (#14883)
     new 091fae9  Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor (#15197)
     new ce0ca24  Update import path and fix typo in `dag-run.rst` (#15201)
     new 7d192de  Fix mistake and typos in doc/docstrings (#15180)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../example_kubernetes_executor_config.py          |  3 +-
 airflow/executors/celery_executor.py               |  3 +
 airflow/executors/kubernetes_executor.py           |  2 +-
 .../basic_template.yaml                            |  4 +-
 airflow/providers/apache/hive/hooks/hive.py        |  2 +-
 airflow/utils/timezone.py                          |  4 +-
 docs/apache-airflow/dag-run.rst                    |  8 +-
 docs/apache-airflow/executor/kubernetes.rst        |  2 +-
 .../basic_template.yaml}                           | 19 +++--
 tests/executors/test_kubernetes_executor.py        | 91 +++++++++++++++++++++-
 10 files changed, 118 insertions(+), 20 deletions(-)
 copy tests/{kubernetes/basic_pod.yaml => executors/kubernetes_executor_template_files/basic_template.yaml} (75%)

[airflow] 01/04: Fix celery executor bug trying to call len on map (#14883)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 50a16666edc9026f35433a41916d4900557bb85a
Author: Ryan Hatter <25...@users.noreply.github.com>
AuthorDate: Tue Apr 6 05:21:38 2021 -0400

    Fix celery executor bug trying to call len on map (#14883)
    
    Co-authored-by: RNHTTR <ry...@wiftapp.com>
    (cherry picked from commit 4ee442970873ba59ee1d1de3ac78ef8e33666e0f)
---
 airflow/executors/celery_executor.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 2d0e915..a4ddfab 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -542,6 +542,9 @@ class BulkStateFetcher(LoggingMixin):
     def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
         return {a.task_id for a in async_tasks}
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):

[airflow] 04/04: Fix mistake and typos in doc/docstrings (#15180)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7d192de5292e8e67f7d730bc628733cb395fa3ed
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Sun Apr 4 19:44:03 2021 +0200

    Fix mistake and typos in doc/docstrings (#15180)
    
    - Fix an apparent mistake in doc relating to catchup
    - Fix typo pickable (should be picklable)
    
    (cherry picked from commit 53dafa593fd7ce0be2a48dc9a9e993bb42b6abc5)
---
 airflow/providers/apache/hive/hooks/hive.py | 2 +-
 airflow/utils/timezone.py                   | 4 ++--
 docs/apache-airflow/dag-run.rst             | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py
index ab7b7b7..d261ab2 100644
--- a/airflow/providers/apache/hive/hooks/hive.py
+++ b/airflow/providers/apache/hive/hooks/hive.py
@@ -487,7 +487,7 @@ class HiveMetastoreHook(BaseHook):
 
     def __getstate__(self) -> Dict[str, Any]:
         # This is for pickling to work despite the thrift hive client not
-        # being pickable
+        # being picklable
         state = dict(self.__dict__)
         del state['metastore']
         return state
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index d302cbe..09736e5 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -56,7 +56,7 @@ def utcnow() -> dt.datetime:
     :return:
     """
     # pendulum utcnow() is not used as that sets a TimezoneInfo object
-    # instead of a Timezone. This is not pickable and also creates issues
+    # instead of a Timezone. This is not picklable and also creates issues
     # when using replace()
     result = dt.datetime.utcnow()
     result = result.replace(tzinfo=utc)
@@ -71,7 +71,7 @@ def utc_epoch() -> dt.datetime:
     :return:
     """
     # pendulum utcnow() is not used as that sets a TimezoneInfo object
-    # instead of a Timezone. This is not pickable and also creates issues
+    # instead of a Timezone. This is not picklable and also creates issues
     # when using replace()
     result = dt.datetime(1970, 1, 1)
     result = result.replace(tzinfo=utc)
diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst
index dbcf68a..0752990 100644
--- a/docs/apache-airflow/dag-run.rst
+++ b/docs/apache-airflow/dag-run.rst
@@ -80,7 +80,7 @@ An Airflow DAG with a ``start_date``, possibly an ``end_date``, and a ``sched
 series of intervals which the scheduler turns into individual DAG Runs and executes. The scheduler, by default, will
 kick off a DAG Run for any interval that has not been run since the last execution date (or has been cleared). This concept is called Catchup.
 
-If your DAG is written to handle its catchup (i.e., not limited to the interval, but instead to ``Now`` for instance.),
+If your DAG is not written to handle its catchup (i.e., not limited to the interval, but instead to ``Now`` for instance.),
 then you will want to turn catchup off. This can be done by setting ``catchup = False`` in DAG  or ``catchup_by_default = False``
 in the configuration file. When turned off, the scheduler creates a DAG run only for the latest interval.
 

[airflow] 03/04: Update import path and fix typo in `dag-run.rst` (#15201)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ce0ca248a9279f308e2961ca419ae6555640013b
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Mon Apr 5 14:46:58 2021 +0300

    Update import path and fix typo in `dag-run.rst` (#15201)
    
    1. fix typo parametrized ->  parameterized
    2. update `from airflow.operators.bash_operator import BashOperator` -> `from airflow.operators.bash import BashOperator`
    
    (cherry picked from commit 4099108f554130cf3f87ba33b9d6084a74e70231)
---
 docs/apache-airflow/dag-run.rst | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst
index 72204f1..dbcf68a 100644
--- a/docs/apache-airflow/dag-run.rst
+++ b/docs/apache-airflow/dag-run.rst
@@ -208,10 +208,10 @@ Example of a parameterized DAG:
 .. code-block:: python
 
     from airflow import DAG
-    from airflow.operators.bash_operator import BashOperator
+    from airflow.operators.bash import BashOperator
     from airflow.utils.dates import days_ago
 
-    dag = DAG("example_parametrized_dag", schedule_interval=None, start_date=days_ago(2))
+    dag = DAG("example_parameterized_dag", schedule_interval=None, start_date=days_ago(2))
 
     parameterized_task = BashOperator(
         task_id='parameterized_task',
@@ -227,7 +227,7 @@ Using CLI
 
 .. code-block:: bash
 
-    airflow dags trigger --conf '{"conf1": "value1"}' example_parametrized_dag
+    airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag
 
 Using UI
 ^^^^^^^^^^

[airflow] 02/04: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor (#15197)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 091fae90a0a564e2da92ead7dd5be2c1e8b56301
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Apr 5 16:56:00 2021 +0100

    Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor (#15197)
    
    This feature was added in https://github.com/apache/airflow/pull/11784 but
    it was broken as it got `pod_template_override` from `executor_config`
    instead of `pod_template_file`.
    
    closes #14199
    
    (cherry picked from commit 5606137ba32c0daa87d557301d82f7f2bdc0b0a4)
---
 .../example_kubernetes_executor_config.py          |  3 +-
 airflow/executors/kubernetes_executor.py           |  2 +-
 .../basic_template.yaml                            |  4 +-
 docs/apache-airflow/executor/kubernetes.rst        |  2 +-
 .../basic_template.yaml                            | 34 ++++++++
 tests/executors/test_kubernetes_executor.py        | 91 +++++++++++++++++++++-
 6 files changed, 130 insertions(+), 6 deletions(-)

diff --git a/airflow/example_dags/example_kubernetes_executor_config.py b/airflow/example_dags/example_kubernetes_executor_config.py
index cbd69cb..5290dd8 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -24,6 +24,7 @@ import os
 from airflow import DAG
 from airflow.example_dags.libs.helper import print_stuff
 from airflow.operators.python import PythonOperator
+from airflow.settings import AIRFLOW_HOME
 from airflow.utils.dates import days_ago
 
 default_args = {
@@ -110,7 +111,7 @@ try:
             task_id="task_with_template",
             python_callable=print_stuff,
             executor_config={
-                "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
+                "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
                 "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
             },
         )
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 7e3d82b..ec7cbf7 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -496,7 +496,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             return
 
         if executor_config:
-            pod_template_file = executor_config.get("pod_template_override", None)
+            pod_template_file = executor_config.get("pod_template_file", None)
         else:
             pod_template_file = None
         if not self.task_queue:
diff --git a/airflow/kubernetes_executor_templates/basic_template.yaml b/airflow/kubernetes_executor_templates/basic_template.yaml
index a953867..a6eb83f 100644
--- a/airflow/kubernetes_executor_templates/basic_template.yaml
+++ b/airflow/kubernetes_executor_templates/basic_template.yaml
@@ -69,8 +69,8 @@ spec:
         defaultMode: 420
   restartPolicy: Never
   terminationGracePeriodSeconds: 30
-  serviceAccountName: airflow-worker-serviceaccount
-  serviceAccount: airflow-worker-serviceaccount
+  serviceAccountName: airflow-worker
+  serviceAccount: airflow-worker
   securityContext:
     runAsUser: 50000
     fsGroup: 50000
diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst
index 217a29c..61d13f4 100644
--- a/docs/apache-airflow/executor/kubernetes.rst
+++ b/docs/apache-airflow/executor/kubernetes.rst
@@ -125,7 +125,7 @@ name ``base`` and a second container containing your desired sidecar.
     :end-before: [END task_with_sidecar]
 
 You can also create custom ``pod_template_file`` on a per-task basis so that you can recycle the same base values between multiple tasks.
-This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override_spec``.
+This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override``.
 
 Here is an example of a task with both features:
 
diff --git a/tests/executors/kubernetes_executor_template_files/basic_template.yaml b/tests/executors/kubernetes_executor_template_files/basic_template.yaml
new file mode 100644
index 0000000..1fb00f2
--- /dev/null
+++ b/tests/executors/kubernetes_executor_template_files/basic_template.yaml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+kind: Pod
+apiVersion: v1
+metadata:
+  name: dummy-name-dont-delete
+  namespace: dummy-name-dont-delete
+  labels:
+    mylabel: foo
+spec:
+  containers:
+    - name: base
+      image: dummy-name-dont-delete
+  securityContext:
+    runAsUser: 50000
+    fsGroup: 50000
+  imagePullSecrets:
+    - name: airflow-registry
+  schedulerName: default-scheduler
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 68b0006..8d3d5b4 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+import pathlib
 import random
 import re
 import string
@@ -22,6 +23,7 @@ import unittest
 from datetime import datetime
 from unittest import mock
 
+import pytest
 from kubernetes.client import models as k8s
 from urllib3 import HTTPResponse
 
@@ -39,7 +41,7 @@ try:
         get_base_pod_from_template,
     )
     from airflow.kubernetes import pod_generator
-    from airflow.kubernetes.pod_generator import PodGenerator
+    from airflow.kubernetes.pod_generator import PodGenerator, datetime_to_label_safe_datestring
     from airflow.utils.state import State
 except ImportError:
     AirflowKubernetesScheduler = None  # type: ignore
@@ -215,6 +217,93 @@ class TestKubernetesExecutor(unittest.TestCase):
 
         assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
 
+    @pytest.mark.execution_timeout(10)
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+        current_folder = pathlib.Path(__file__).parent.absolute()
+        template_file = str(
+            (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+        )
+
+        mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+        mock_get_kube_client.return_value = mock_kube_client
+
+        with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+            executor = self.kubernetes_executor
+            executor.start()
+
+            assert executor.event_buffer == {}
+            assert executor.task_queue.empty()
+
+            execution_date = datetime.utcnow()
+
+            executor.execute_async(
+                key=('dag', 'task', execution_date, 1),
+                queue=None,
+                command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+                executor_config={
+                    "pod_template_file": template_file,
+                    "pod_override": k8s.V1Pod(
+                        metadata=k8s.V1ObjectMeta(labels={"release": "stable"}),
+                        spec=k8s.V1PodSpec(
+                            containers=[k8s.V1Container(name="base", image="airflow:3.6")],
+                        ),
+                    ),
+                },
+            )
+
+            assert not executor.task_queue.empty()
+            task = executor.task_queue.get_nowait()
+            _, _, expected_executor_config, expected_pod_template_file = task
+
+            # Test that the correct values have been put to queue
+            assert expected_executor_config.metadata.labels == {'release': 'stable'}
+            assert expected_pod_template_file == template_file
+
+            self.kubernetes_executor.kube_scheduler.run_next(task)
+            mock_run_pod_async.assert_called_once_with(
+                k8s.V1Pod(
+                    api_version="v1",
+                    kind="Pod",
+                    metadata=k8s.V1ObjectMeta(
+                        name=mock.ANY,
+                        namespace="default",
+                        annotations={
+                            'dag_id': 'dag',
+                            'execution_date': execution_date.isoformat(),
+                            'task_id': 'task',
+                            'try_number': '1',
+                        },
+                        labels={
+                            'airflow-worker': '5',
+                            'airflow_version': mock.ANY,
+                            'dag_id': 'dag',
+                            'execution_date': datetime_to_label_safe_datestring(execution_date),
+                            'kubernetes_executor': 'True',
+                            'mylabel': 'foo',
+                            'release': 'stable',
+                            'task_id': 'task',
+                            'try_number': '1',
+                        },
+                    ),
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                image="airflow:3.6",
+                                args=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+                                env=[k8s.V1EnvVar(name='AIRFLOW_IS_K8S_EXECUTOR_POD', value='True')],
+                            )
+                        ],
+                        image_pull_secrets=[k8s.V1LocalObjectReference(name='airflow-registry')],
+                        scheduler_name='default-scheduler',
+                        security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000),
+                    ),
+                )
+            )
+
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):