You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/18 22:45:54 UTC

[airflow] branch v1-10-test updated (2f306e8 -> 2b25f01)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 2f306e8  fixup!  Add ability to specify pod_template_file in executor_config (#11784)
 discard 8e74a20  fixup!  Add ability to specify pod_template_file in executor_config (#11784)
 discard 721d50a  fixup!  Add ability to specify pod_template_file in executor_config (#11784)
 discard 52966ff  fixup!  Add ability to specify pod_template_file in executor_config (#11784)
 discard c1ce767  fixup!  Add ability to specify pod_template_file in executor_config (#11784)
 discard 924e370  fixup!  Add ability to specify pod_template_file in executor_config (#11784)
 discard 4b696b1  fixup!  Add ability to specify pod_template_file in executor_config (#11784)
 discard c240924  fixup!  Add ability to specify pod_template_file in executor_config (#11784)
 discard c26b5c7  Add ability to specify pod_template_file in executor_config (#11784)
     new bb78ab0  Add ability to specify pod_template_file in executor_config (#11784)
     new 5b455d9  Add permission "extra_links" for Viewer role and above (#10719)
     new 97396c7  Log instead of raise an Error for unregistered OperatorLinks (#11959)
     new 8482ace  [Doc] Correct description for macro task_instance_key_str (#11062)
     new d488aaf  Security upgrade lodash from 4.17.19 to 4.17.20 (#11095)
     new bbbcfcb  Fix Entrypoint and _CMD config variables (#12411)
     new 2b25f01  Fix issues with Gantt View (#12419)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (2f306e8)
            \
             N -- N -- N   refs/heads/v1-10-test (2b25f01)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/serialization/serialized_objects.py   |  3 +-
 airflow/www_rbac/package.json                 |  4 +--
 airflow/www_rbac/security.py                  |  1 +
 airflow/www_rbac/views.py                     | 46 ++++++++++++---------------
 airflow/www_rbac/yarn.lock                    |  5 +++
 docs/macros-ref.rst                           |  2 +-
 scripts/in_container/prod/entrypoint_prod.sh  | 30 +++++++++++------
 tests/serialization/test_dag_serialization.py | 36 ++++++++++++++++++++-
 tests/www_rbac/test_views.py                  | 19 +++++++++++
 9 files changed, 107 insertions(+), 39 deletions(-)


[airflow] 02/07: Add permission "extra_links" for Viewer role and above (#10719)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5b455d98f6f703ce1cf9b4a8b0de29f9e5b12bed
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Sun Sep 6 12:26:08 2020 -0400

    Add permission "extra_links" for Viewer role and above (#10719)
    
    This change adds 'can extra links on Airflow' to the Viewer role and above. Currently, only Admins can see extra links by default.
    
    (cherry picked from commit 59f9a4116a310f7cf675c09a0aba49b186494d9b)
---
 airflow/www_rbac/security.py |  1 +
 tests/www_rbac/test_views.py | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+)

diff --git a/airflow/www_rbac/security.py b/airflow/www_rbac/security.py
index 0108081..d44cf70 100644
--- a/airflow/www_rbac/security.py
+++ b/airflow/www_rbac/security.py
@@ -112,6 +112,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
         'can_rendered',
         'can_pickle_info',
         'can_version',
+        'can_extra_links'
     }
 
     USER_PERMS = {
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index 4e06b57..9b019bd 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -2322,6 +2322,25 @@ class TestExtraLinks(TestBase):
 
     def tearDown(self):
         super(TestExtraLinks, self).tearDown()
+        self.logout()
+        self.login()
+
+    def login(self):
+        role_viewer = self.appbuilder.sm.find_role('Viewer')
+        test_viewer = self.appbuilder.sm.find_user(username='test_viewer')
+        if not test_viewer:
+            self.appbuilder.sm.add_user(
+                username='test_viewer',
+                first_name='test_viewer',
+                last_name='test_viewer',
+                email='test_viewer@fab.org',
+                role=role_viewer,
+                password='test_viewer')
+
+        return self.client.post('/login/', data=dict(
+            username='test_viewer',
+            password='test_viewer'
+        ))
 
     @mock.patch('airflow.www_rbac.views.dagbag.get_dag')
     def test_extra_links_works(self, get_dag_function):


[airflow] 01/07: Add ability to specify pod_template_file in executor_config (#11784)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bb78ab015d97bbe5ca123af0c1ebdf4879d9cd97
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Thu Nov 5 14:48:05 2020 -0800

    Add ability to specify pod_template_file in executor_config (#11784)
    
    * Add pod_template_override to executor_config
    
    Users will be able to override the base pod_template_file on a per-task
    basis.
    
    * change docstring
    
    * fix doc
    
    * fix static checks
    
    * add description
    
    (cherry picked from commit 68ba54bbd5a275fba1a126f8e67bd69e5cf4b362)
---
 .../example_kubernetes_executor_config.py          |  95 +++++++++++++++---
 airflow/executors/kubernetes_executor.py           |  53 +++++++---
 airflow/kubernetes/pod_generator.py                |  26 ++++-
 .../dags_in_image_template.yaml                    |  84 ++++++++++++++++
 .../dags_in_volume_template.yaml                   |  81 ++++++++++++++++
 .../git_sync_template.yaml                         | 107 +++++++++++++++++++++
 airflow/serialization/enums.py                     |   2 +
 airflow/serialization/serialized_objects.py        |  18 ++++
 chart/requirements.lock                            |   6 +-
 docs/executor/kubernetes.rst                       |  95 ++++++++++++++++++
 tests/executors/test_kubernetes_executor.py        |  26 ++++-
 tests/serialization/test_dag_serialization.py      |   8 ++
 12 files changed, 563 insertions(+), 38 deletions(-)

diff --git a/airflow/example_dags/example_kubernetes_executor_config.py b/airflow/example_dags/example_kubernetes_executor_config.py
index 2e4ba00..e3f42d0 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -22,6 +22,8 @@ This is an example dag for using a Kubernetes Executor Configuration.
 from __future__ import print_function
 
 import os
+from kubernetes.client import models as k8s
+
 
 from airflow.contrib.example_dags.libs.helper import print_stuff
 from airflow.models import DAG
@@ -40,6 +42,20 @@ with DAG(
     schedule_interval=None
 ) as dag:
 
+    def test_sharedvolume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+        for i in range(5):
+            try:
+                return_code = os.system("cat /shared/test.txt")
+                if return_code != 0:
+                    raise ValueError("Error when checking volume mount. Return code {return_code}"
+                                     .format(return_code=return_code))
+            except ValueError as e:
+                if i > 4:
+                    raise e
+
     def test_volume_mount():
         """
         Tests whether the volume has been mounted.
@@ -61,27 +77,74 @@ with DAG(
         }
     )
 
+    # [START task_with_volume]
+
     # You can mount volume or secret to the worker pod
     second_task = PythonOperator(
         task_id="four_task",
         python_callable=test_volume_mount,
         executor_config={
-            "KubernetesExecutor": {
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ]
-            }
-        }
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[
+                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                            ],
+                        )
+                    ],
+                    volumes=[
+                        k8s.V1Volume(
+                            name="example-kubernetes-test-volume",
+                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                        )
+                    ],
+                )
+            ),
+        },
+    )
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    task_with_template = PythonOperator(
+        task_id="task_with_template",
+        python_callable=print_stuff,
+        executor_config={
+            "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
+            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+        },
+    )
+    # [END task_with_template]
+
+    # [START task_with_sidecar]
+    sidecar_task = PythonOperator(
+        task_id="task_with_sidecar",
+        python_callable=test_sharedvolume_mount,
+        executor_config={
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
+                            command=["bash", "-cx"],
+                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+                        ),
+                    ],
+                    volumes=[
+                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
+                    ],
+                )
+            ),
+        },
     )
+    # [END task_with_sidecar]
 
     # Test that we can add labels to pods
     third_task = PythonOperator(
@@ -111,3 +174,5 @@ with DAG(
 
     start_task >> second_task >> third_task
     start_task >> other_ns_task
+    start_task >> sidecar_task
+    start_task >> task_with_template
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 73dd91e..bdbd1cb 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -30,9 +30,9 @@ from queue import Empty
 
 import kubernetes
 from dateutil import parser
-from kubernetes import watch, client
+from kubernetes import client, watch
 from kubernetes.client.rest import ApiException
-from urllib3.exceptions import HTTPError, ReadTimeoutError
+from urllib3.exceptions import ReadTimeoutError
 
 from airflow import settings
 from airflow.configuration import conf
@@ -427,11 +427,18 @@ class AirflowKubernetesScheduler(LoggingMixin):
         status
         """
         self.log.info('Kubernetes job is %s', str(next_job))
-        key, command, kube_executor_config = next_job
+        key, command, kube_executor_config, pod_template_file = next_job
         dag_id, task_id, execution_date, try_number = key
 
         if command[0:2] != ["airflow", "run"]:
-            raise ValueError('The command must start with ["airflow", "run"].')
+            raise ValueError('The command must start with ["airflow", "tasks", "run"].')
+
+        base_worker_pod = get_base_pod_from_template(pod_template_file, self.kube_config)
+
+        if not base_worker_pod:
+            raise AirflowException(
+                "could not find a valid worker template yaml at {}".format(self.kube_config.pod_template_file)
+            )
 
         pod = PodGenerator.construct_pod(
             namespace=self.namespace,
@@ -662,6 +669,21 @@ class AirflowKubernetesScheduler(LoggingMixin):
         self._manager.shutdown()
 
 
+def get_base_pod_from_template(pod_template_file, kube_config):
+    """
+    Reads either the pod_template_file set in the executor_config or the base pod_template_file
+    set in the airflow.cfg to craft a "base pod" that will be used by the KubernetesExecutor
+
+    :param pod_template_file: absolute path to a pod_template_file.yaml or None
+    :param kube_config: The KubeConfig class generated by airflow that contains all kube metadata
+    :return: a V1Pod that can be used as the base pod for k8s tasks
+    """
+    if pod_template_file:
+        return PodGenerator.deserialize_model_file(pod_template_file)
+    else:
+        return PodGenerator.deserialize_model_file(kube_config.pod_template_file)
+
+
 class KubernetesExecutor(BaseExecutor, LoggingMixin):
     """Executor for Kubernetes"""
     def __init__(self):
@@ -794,7 +816,11 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         )
 
         kube_executor_config = PodGenerator.from_obj(executor_config)
-        self.task_queue.put((key, command, kube_executor_config))
+        if executor_config:
+            pod_template_file = executor_config.get("pod_template_override", None)
+        else:
+            pod_template_file = None
+        self.task_queue.put((key, command, kube_executor_config, pod_template_file))
 
     def sync(self):
         """Synchronize task state."""
@@ -831,13 +857,16 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    self.log.warning('ApiException when attempting to run task, re-queueing. '
-                                     'Message: %s', json.loads(e.body)['message'])
-                    self.task_queue.put(task)
-                except HTTPError as e:
-                    self.log.warning('HTTPError when attempting to run task, re-queueing. '
-                                     'Exception: %s', str(e))
-                    self.task_queue.put(task)
+                    if e.reason == "BadRequest":
+                        self.log.error("Request was invalid. Failing task")
+                        key, _, _, _ = task
+                        self.change_state(key, State.FAILED, e)
+                    else:
+                        self.log.warning(
+                            'ApiException when attempting to run task, re-queueing. ' 'Message: %s',
+                            json.loads(e.body)['message'],
+                        )
+                        self.task_queue.put(task)
                 finally:
                     self.task_queue.task_done()
             except Empty:
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 5a57230..e12fba4 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -564,6 +564,18 @@ class PodGenerator(object):
         return reduce(PodGenerator.reconcile_pods, pod_list)
 
     @staticmethod
+    def serialize_pod(pod):
+        """
+
+        Converts a k8s.V1Pod into a jsonified object
+
+        @param pod:
+        @return:
+        """
+        api_client = ApiClient()
+        return api_client.sanitize_for_serialization(pod)
+
+    @staticmethod
     def deserialize_model_file(path):
         """
         :param path: Path to the file
@@ -573,15 +585,23 @@ class PodGenerator(object):
         ``_ApiClient__deserialize_model`` from the kubernetes client.
         This issue is tracked here; https://github.com/kubernetes-client/python/issues/977.
         """
-        api_client = ApiClient()
         if os.path.exists(path):
             with open(path) as stream:
                 pod = yaml.safe_load(stream)
         else:
             pod = yaml.safe_load(path)
 
-        # pylint: disable=protected-access
-        return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod)
+        return PodGenerator.deserialize_model_dict(pod)
+
+    @staticmethod
+    def deserialize_model_dict(pod_dict):
+        """
+        Deserializes python dictionary to k8s.V1Pod
+        @param pod_dict:
+        @return:
+        """
+        api_client = ApiClient()
+        return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod)  # pylint: disable=W0212
 
 
 def merge_objects(base_obj, client_obj):
diff --git a/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml b/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml
new file mode 100644
index 0000000..b1995c2
--- /dev/null
+++ b/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# [START template_with_dags_in_image]
+---
+apiVersion: v1
+kind: Pod
+metadata:
+  name: dummy-name
+spec:
+  containers:
+    - args: []
+      command: []
+      env:
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: LocalExecutor
+        # Hard Coded Airflow Envs
+        - name: AIRFLOW__CORE__FERNET_KEY
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-fernet-key
+              key: fernet-key
+        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+        - name: AIRFLOW_CONN_AIRFLOW_DB
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+      envFrom: []
+      image: dummy_image
+      imagePullPolicy: IfNotPresent
+      name: base
+      ports: []
+      volumeMounts:
+        - mountPath: "/opt/airflow/logs"
+          name: airflow-logs
+        - mountPath: /opt/airflow/dags
+          name: airflow-dags
+          readOnly: false
+        - mountPath: /opt/airflow/dags
+          name: airflow-dags
+          readOnly: true
+          subPath: repo/tests/dags
+  hostNetwork: false
+  restartPolicy: Never
+  securityContext:
+    runAsUser: 50000
+  nodeSelector:
+    {}
+  affinity:
+    {}
+  tolerations:
+    []
+  serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
+  volumes:
+    - name: dags
+      persistentVolumeClaim:
+        claimName: RELEASE-NAME-dags
+    - emptyDir: {}
+      name: airflow-logs
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-config
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-local-settings
+# [END template_with_dags_in_image]
diff --git a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
new file mode 100644
index 0000000..86b5358
--- /dev/null
+++ b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# [START template_with_dags_in_volume]
+---
+apiVersion: v1
+kind: Pod
+metadata:
+  name: dummy-name
+spec:
+  containers:
+    - args: []
+      command: []
+      env:
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: LocalExecutor
+        # Hard Coded Airflow Envs
+        - name: AIRFLOW__CORE__FERNET_KEY
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-fernet-key
+              key: fernet-key
+        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+        - name: AIRFLOW_CONN_AIRFLOW_DB
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+      envFrom: []
+      image: dummy_image
+      imagePullPolicy: IfNotPresent
+      name: base
+      ports: []
+      volumeMounts:
+        - mountPath: "/opt/airflow/logs"
+          name: airflow-logs
+        - mountPath: /opt/airflow/dags
+          name: airflow-dags
+          readOnly: true
+          subPath: repo/tests/dags
+  hostNetwork: false
+  restartPolicy: Never
+  securityContext:
+    runAsUser: 50000
+  nodeSelector:
+    {}
+  affinity:
+    {}
+  tolerations:
+    []
+  serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
+  volumes:
+    - name: dags
+      persistentVolumeClaim:
+        claimName: RELEASE-NAME-dags
+    - emptyDir: {}
+      name: airflow-logs
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-config
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-local-settings
+# [END template_with_dags_in_volume]
diff --git a/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml b/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml
new file mode 100644
index 0000000..a962a8f
--- /dev/null
+++ b/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# [START git_sync_template]
+apiVersion: v1
+kind: Pod
+metadata:
+  name: dummy-name
+spec:
+  initContainers:
+    - name: git-sync
+      image: "k8s.gcr.io/git-sync:v3.1.6"
+      env:
+        - name: GIT_SYNC_REV
+          value: "HEAD"
+        - name: GIT_SYNC_BRANCH
+          value: "v1-10-stable"
+        - name: GIT_SYNC_REPO
+          value: "https://github.com/apache/airflow.git"
+        - name: GIT_SYNC_DEPTH
+          value: "1"
+        - name: GIT_SYNC_ROOT
+          value: "/git"
+        - name: GIT_SYNC_DEST
+          value: "repo"
+        - name: GIT_SYNC_ADD_USER
+          value: "true"
+        - name: GIT_SYNC_WAIT
+          value: "60"
+        - name: GIT_SYNC_MAX_SYNC_FAILURES
+          value: "0"
+      volumeMounts:
+        - name: dags
+          mountPath: /git
+  containers:
+    - args: []
+      command: []
+      env:
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: LocalExecutor
+        # Hard Coded Airflow Envs
+        - name: AIRFLOW__CORE__FERNET_KEY
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-fernet-key
+              key: fernet-key
+        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+        - name: AIRFLOW_CONN_AIRFLOW_DB
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+      envFrom: []
+      image: dummy_image
+      imagePullPolicy: IfNotPresent
+      name: base
+      ports: []
+      volumeMounts:
+        - mountPath: "/opt/airflow/logs"
+          name: airflow-logs
+        - mountPath: /opt/airflow/dags
+          name: airflow-dags
+          readOnly: false
+        - mountPath: /opt/airflow/dags
+          name: airflow-dags
+          readOnly: true
+  hostNetwork: false
+  restartPolicy: Never
+  securityContext:
+    runAsUser: 50000
+  nodeSelector:
+    {}
+  affinity:
+    {}
+  tolerations:
+    []
+  serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
+  volumes:
+    - name: dags
+      emptyDir: {}
+    - emptyDir: {}
+      name: airflow-logs
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-config
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-local-settings
+# [END git_sync_template]
diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py
index 8e5fee6..e4f72c5 100644
--- a/airflow/serialization/enums.py
+++ b/airflow/serialization/enums.py
@@ -36,10 +36,12 @@ class DagAttributeTypes(str, Enum):
     """Enum of supported attribute types of DAG."""
     DAG = 'dag'
     OP = 'operator'
+
     DATETIME = 'datetime'
     TIMEDELTA = 'timedelta'
     TIMEZONE = 'timezone'
     RELATIVEDELTA = 'relativedelta'
     DICT = 'dict'
+    POD = 'k8s.V1Pod'
     SET = 'set'
     TUPLE = 'tuple'
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index d959e92..6d1c30a 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -47,6 +47,16 @@ except ImportError:
 if TYPE_CHECKING:
     from inspect import Parameter
 
+try:
+    # isort: off
+    from kubernetes.client import models as k8s
+    from airflow.kubernetes.pod_generator import PodGenerator
+
+    # isort: on
+    HAS_KUBERNETES = True
+except ImportError:
+    HAS_KUBERNETES = False
+
 log = logging.getLogger(__name__)
 
 
@@ -199,6 +209,9 @@ class BaseSerialization:
                 return [cls._serialize(v) for v in var]
             elif isinstance(var, DAG):
                 return SerializedDAG.serialize_dag(var)
+            elif HAS_KUBERNETES and isinstance(var, k8s.V1Pod):
+                json_pod = PodGenerator.serialize_pod(var)
+                return cls._encode(json_pod, type_=DAT.POD)
             elif isinstance(var, BaseOperator):
                 return SerializedBaseOperator.serialize_operator(var)
             elif isinstance(var, cls._datetime_types):
@@ -253,6 +266,11 @@ class BaseSerialization:
             return SerializedBaseOperator.deserialize_operator(var)
         elif type_ == DAT.DATETIME:
             return pendulum.from_timestamp(var)
+        elif type_ == DAT.POD:
+            if not HAS_KUBERNETES:
+                raise RuntimeError("Cannot deserialize POD objects without kubernetes libraries installed!")
+            pod = PodGenerator.deserialize_model_dict(var)
+            return pod
         elif type_ == DAT.TIMEDELTA:
             return datetime.timedelta(seconds=var)
         elif type_ == DAT.TIMEZONE:
diff --git a/chart/requirements.lock b/chart/requirements.lock
index e460e9f..eb62c80 100644
--- a/chart/requirements.lock
+++ b/chart/requirements.lock
@@ -1,6 +1,6 @@
 dependencies:
 - name: postgresql
-  repository: https://charts.helm.sh/stable/
+  repository: https://kubernetes-charts.storage.googleapis.com
   version: 6.3.12
-digest: sha256:1748aa702050d4e72ffba1b18960f49bfe5368757cf976116afeffbdedda1c98
-generated: "2020-11-07T17:40:45.418723358+01:00"
+digest: sha256:58d88cf56e78b2380091e9e16cc6ccf58b88b3abe4a1886dd47cd9faef5309af
+generated: "2020-11-04T15:59:36.967913-08:00"
diff --git a/docs/executor/kubernetes.rst b/docs/executor/kubernetes.rst
index 042d638..ed172a8 100644
--- a/docs/executor/kubernetes.rst
+++ b/docs/executor/kubernetes.rst
@@ -34,6 +34,101 @@ The kubernetes executor is introduced in Apache Airflow 1.10.0. The Kubernetes e
 
   - Another option is to use S3/GCS/etc to store logs
 
+To troubleshoot issue with KubernetesExecutor, you can use ``airflow kubernetes generate-dag-yaml`` command.
+This command generates the pods as they will be launched in Kubernetes and dumps them into yaml files for you to inspect.
+
+.. _concepts:pod_template_file:
+
+pod_template_file
+#################
+
+As of Airflow 1.10.12, you can now use the ``pod_template_file`` option in the ``kubernetes`` section
+of the ``airflow.cfg`` file to form the basis of your KubernetesExecutor pods. This process is faster to execute
+and easier to modify.
+
+We include multiple examples of working pod operators below, but we would also like to explain a few necessary components
+if you want to customize your template files. As long as you have these components, every other element
+in the template is customizable.
+
+1. Airflow will overwrite the base container image and the pod name
+
+There are two points where Airflow potentially overwrites the base image: in the ``airflow.cfg``
+or the ``pod_override`` (discussed below) setting. This value is overwritten to ensure that users do
+not need to update multiple template files every time they upgrade their docker image. The other field
+that Airflow overwrites is the ``pod.metadata.name`` field. This field has to be unique across all pods,
+so we generate these names dynamically before launch.
+
+It's important to note while Airflow overwrites these fields, they **can not be left blank**.
+If these fields do not exist, kubernetes can not load the yaml into a Kubernetes V1Pod.
+
+2. Each Airflow ``pod_template_file`` must have a container named "base" at the ``pod.spec.containers[0]`` position
+
+Airflow uses the ``pod_template_file`` by making certain assumptions about the structure of the template.
+When airflow creates the worker pod's command, it assumes that the airflow worker container part exists
+at the beginning of the container array. It then assumes that the container is named ``base``
+when it merges this pod with internal configs. You are more than welcome to create
+sidecar containers after this required container.
+
+With these requirements in mind, here are some examples of basic ``pod_template_file`` YAML files.
+
+pod_template_file using the ``dag_in_image`` setting:
+
+.. exampleinclude:: /../airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml
+    :language: yaml
+    :start-after: [START template_with_dags_in_image]
+    :end-before: [END template_with_dags_in_image]
+
+``pod_template_file`` which stores DAGs in a ``persistentVolume``:
+
+.. exampleinclude:: /../airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
+    :language: yaml
+    :start-after: [START template_with_dags_in_volume]
+    :end-before: [END template_with_dags_in_volume]
+
+``pod_template_file`` which pulls DAGs from git:
+
+.. exampleinclude:: /../airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml
+    :language: yaml
+    :start-after:  [START git_sync_template]
+    :end-before: [END git_sync_template]
+
+.. _concepts:pod_override:
+
+pod_override
+############
+
+When using the KubernetesExecutor, Airflow offers the ability to override system defaults on a per-task basis.
+To utilize this functionality, create a Kubernetes V1pod object and fill in your desired overrides.
+Please note that the scheduler will override the ``metadata.name`` of the V1pod before launching it.
+
+To overwrite the base container of the pod launched by the KubernetesExecutor,
+create a V1pod with a single container, and overwrite the fields as follows:
+
+.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py
+    :language: python
+    :start-after: [START task_with_volume]
+    :end-before: [END task_with_volume]
+
+Note that volume mounts environment variables, ports, and devices will all be extended instead of overwritten.
+
+To add a sidecar container to the launched pod, create a V1pod with an empty first container with the
+name ``base`` and a second container containing your desired sidecar.
+
+.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py
+    :language: python
+    :start-after: [START task_with_sidecar]
+    :end-before: [END task_with_sidecar]
+
+You can also create custom ``pod_template_file`` on a per-task basis so that you can recycle the same base values between multiple tasks.
+This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override_spec``.
+
+Here is an example of a task with both features:
+
+.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py
+    :language: python
+    :start-after: [START task_with_template]
+    :end-before: [END task_with_template]
+
 KubernetesExecutor Architecture
 ################################
 
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 3dabb78..f5f415a 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -29,12 +29,15 @@ from tests.compat import mock
 from tests.test_utils.config import conf_vars
 try:
     from kubernetes.client.rest import ApiException
-    from airflow import configuration  # noqa: F401
-    from airflow.configuration import conf  # noqa: F401
-    from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubeConfig
-    from airflow.executors.kubernetes_executor import KubernetesExecutor
-    from airflow.kubernetes.pod_generator import PodGenerator
+
+    from airflow.executors.kubernetes_executor import (
+        AirflowKubernetesScheduler,
+        KubeConfig,
+        KubernetesExecutor,
+        get_base_pod_from_template,
+    )
     from airflow.kubernetes import pod_generator
+    from airflow.kubernetes.pod_generator import PodGenerator
     from airflow.utils.state import State
 except ImportError:
     AirflowKubernetesScheduler = None  # type: ignore
@@ -94,6 +97,19 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
             )
             self.assertTrue(self._is_valid_pod_id(pod_name))
 
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @mock.patch("airflow.kubernetes.pod_generator.PodGenerator")
+    @mock.patch("airflow.executors.kubernetes_executor.KubeConfig")
+    def test_get_base_pod_from_template(self, mock_kubeconfig, mock_generator):
+        pod_template_file_path = "/bar/biz"
+        get_base_pod_from_template(pod_template_file_path, None)
+        self.assertEqual("deserialize_model_dict", mock_generator.mock_calls[0][0])
+        self.assertEqual(pod_template_file_path, mock_generator.mock_calls[0][1][0])
+        mock_kubeconfig.pod_template_file = "/foo/bar"
+        get_base_pod_from_template(None, mock_kubeconfig)
+        self.assertEqual("deserialize_model_dict", mock_generator.mock_calls[1][0])
+        self.assertEqual("/foo/bar", mock_generator.mock_calls[1][1][0])
+
     def test_make_safe_label_value(self):
         for dag_id, task_id in self._cases():
             safe_dag_id = pod_generator.make_safe_label_value(dag_id)
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 6b714a8..a0447b8 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -259,6 +259,14 @@ class TestStringifiedDAGs(unittest.TestCase):
 
         assert sorted_serialized_dag(ground_truth_dag) == sorted_serialized_dag(json_dag)
 
+    def test_deser_k8s_pod_override(self):
+        dag = collect_dags('airflow/example_dags')['example_kubernetes_executor_config']
+        serialized = SerializedDAG.to_json(dag)
+        deser_dag = SerializedDAG.from_json(serialized)
+        p1 = dag.tasks[1].executor_config
+        p2 = deser_dag.tasks[1].executor_config
+        self.assertDictEqual(p1['pod_override'].to_dict(), p2['pod_override'].to_dict())
+
     def test_deserialization_across_process(self):
         """A serialized DAG can be deserialized in another process."""
 


[airflow] 06/07: Fix Entrypoint and _CMD config variables (#12411)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bbbcfcba78dcc0252a0adeddd0a3dbd4d3637d37
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Wed Nov 18 00:47:13 2020 +0000

    Fix Entrypoint and _CMD config variables (#12411)
    
    closes https://github.com/apache/airflow/issues/8705
    
    Co-Authored-By: Noël Bardelot <11...@users.noreply.github.com>
    (cherry picked from commit f4851f7d75d45f85204e3ceba77c8b8e479a1d7c)
---
 scripts/in_container/prod/entrypoint_prod.sh | 30 +++++++++++++++++++---------
 1 file changed, 21 insertions(+), 9 deletions(-)

diff --git a/scripts/in_container/prod/entrypoint_prod.sh b/scripts/in_container/prod/entrypoint_prod.sh
index 6843919..60103e7 100755
--- a/scripts/in_container/prod/entrypoint_prod.sh
+++ b/scripts/in_container/prod/entrypoint_prod.sh
@@ -99,16 +99,28 @@ if ! whoami &> /dev/null; then
 fi
 
 
-# if no DB configured - use sqlite db by default
-AIRFLOW__CORE__SQL_ALCHEMY_CONN="${AIRFLOW__CORE__SQL_ALCHEMY_CONN:="sqlite:///${AIRFLOW_HOME}/airflow.db"}"
-
-verify_db_connection "${AIRFLOW__CORE__SQL_ALCHEMY_CONN}"
-
-AIRFLOW__CELERY__BROKER_URL=${AIRFLOW__CELERY__BROKER_URL:=}
+# Warning: command environment variables (*_CMD) have priority over usual configuration variables
+# for configuration parameters that require sensitive information. This is the case for the SQL database
+# and the broker backend in this entrypoint script.
+
+if [[ -n "${AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD=}" ]]; then
+    verify_db_connection "$(eval "$AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD")"
+else
+    # if no DB configured - use sqlite db by default
+    AIRFLOW__CORE__SQL_ALCHEMY_CONN="${AIRFLOW__CORE__SQL_ALCHEMY_CONN:="sqlite:///${AIRFLOW_HOME}/airflow.db"}"
+    verify_db_connection "${AIRFLOW__CORE__SQL_ALCHEMY_CONN}"
+fi
 
-if [[ -n ${AIRFLOW__CELERY__BROKER_URL=} ]] && \
-        [[ ${AIRFLOW_COMMAND} =~ ^(scheduler|worker|flower)$ ]]; then
-    verify_db_connection "${AIRFLOW__CELERY__BROKER_URL}"
+# Note: the broker backend configuration concerns only a subset of Airflow components
+if [[ ${AIRFLOW_COMMAND} =~ ^(scheduler|worker|flower)$ ]]; then
+    if [[ -n "${AIRFLOW__CELERY__BROKER_URL_CMD=}" ]]; then
+        verify_db_connection "$(eval "$AIRFLOW__CELERY__BROKER_URL_CMD")"
+    else
+        AIRFLOW__CELERY__BROKER_URL=${AIRFLOW__CELERY__BROKER_URL:=}
+        if [[ -n ${AIRFLOW__CELERY__BROKER_URL=} ]]; then
+            verify_db_connection "${AIRFLOW__CELERY__BROKER_URL}"
+        fi
+    fi
 fi
 
 if [[ ${AIRFLOW_COMMAND} == "bash" ]]; then


[airflow] 07/07: Fix issues with Gantt View (#12419)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2b25f015203a6b90ece1609efa28f9480274d0a5
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Nov 17 21:50:39 2020 +0000

    Fix issues with Gantt View (#12419)
    
    closes https://github.com/apache/airflow/issues/9813
    closes https://github.com/apache/airflow/issues/9633
    
    and does some cleanup
---
 airflow/www_rbac/views.py | 46 +++++++++++++++++++++-------------------------
 1 file changed, 21 insertions(+), 25 deletions(-)

diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 9d46d03..8afcc27 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -1997,42 +1997,38 @@ class Airflow(AirflowBaseView):
             .all()
         ) for ti in tis]))
 
-        # determine bars to show in the gantt chart
-        # all reschedules of one attempt are combinded into one bar
-        gantt_bar_items = []
-
         tasks = []
         for ti in tis:
-            end_date = ti.end_date or timezone.utcnow()
             # prev_attempted_tries will reflect the currently running try_number
             # or the try_number of the last complete run
             # https://issues.apache.org/jira/browse/AIRFLOW-2143
-            try_count = ti.prev_attempted_tries
-            gantt_bar_items.append((ti.task_id, ti.start_date, end_date, ti.state, try_count))
-            d = alchemy_to_dict(ti)
-            d['extraLinks'] = dag.get_task(ti.task_id).extra_links
-            tasks.append(d)
+            try_count = ti.prev_attempted_tries if ti.prev_attempted_tries != 0 else ti.try_number
+            task_dict = alchemy_to_dict(ti)
+            task_dict['end_date'] = task_dict['end_date'] or timezone.utcnow()
+            task_dict['extraLinks'] = dag.get_task(ti.task_id).extra_links
+            task_dict['try_number'] = try_count
+            tasks.append(task_dict)
 
         tf_count = 0
         try_count = 1
         prev_task_id = ""
-        for tf in ti_fails:
-            end_date = tf.end_date or timezone.utcnow()
-            start_date = tf.start_date or end_date
-            if tf_count != 0 and tf.task_id == prev_task_id:
-                try_count = try_count + 1
+        for failed_task_instance in ti_fails:
+            if tf_count != 0 and failed_task_instance.task_id == prev_task_id:
+                try_count += 1
             else:
                 try_count = 1
-            prev_task_id = tf.task_id
-            gantt_bar_items.append((tf.task_id, start_date, end_date, State.FAILED, try_count))
-            tf_count = tf_count + 1
-            task = dag.get_task(tf.task_id)
-            d = alchemy_to_dict(tf)
-            d['state'] = State.FAILED
-            d['operator'] = task.task_type
-            d['try_number'] = try_count
-            d['extraLinks'] = task.extra_links
-            tasks.append(d)
+            prev_task_id = failed_task_instance.task_id
+            tf_count += 1
+            task = dag.get_task(failed_task_instance.task_id)
+            task_dict = alchemy_to_dict(failed_task_instance)
+            end_date = task_dict['end_date'] or timezone.utcnow()
+            task_dict['end_date'] = end_date
+            task_dict['start_date'] = task_dict['start_date'] or end_date
+            task_dict['state'] = State.FAILED
+            task_dict['operator'] = task.task_type
+            task_dict['try_number'] = try_count
+            task_dict['extraLinks'] = task.extra_links
+            tasks.append(task_dict)
 
         data = {
             'taskNames': [ti.task_id for ti in tis],


[airflow] 04/07: [Doc] Correct description for macro task_instance_key_str (#11062)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8482ace0e0157b829db72b9d5fddfd758da699cd
Author: Xiaodong DENG <xd...@hotmail.com>
AuthorDate: Mon Sep 21 22:28:11 2020 +0200

    [Doc] Correct description for macro task_instance_key_str (#11062)
    
    Correction based on code https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py
    
    (cherry picked from commit 2afb20dffab9cc89244ebe54d6c7a7dd43dd4e63)
---
 docs/macros-ref.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/macros-ref.rst b/docs/macros-ref.rst
index eea8691..3e95e5a 100644
--- a/docs/macros-ref.rst
+++ b/docs/macros-ref.rst
@@ -70,7 +70,7 @@ Variable                                Description
                                         with deserialized JSON object, append the path to the
                                         key within the JSON object
 ``{{ task_instance_key_str }}``         a unique, human-readable key to the task instance
-                                        formatted ``{dag_id}_{task_id}_{ds}``
+                                        formatted ``{dag_id}__{task_id}__{ds_nodash}``
 ``{{ conf }}``                          the full configuration object located at
                                         ``airflow.configuration.conf`` which
                                         represents the content of your


[airflow] 05/07: Security upgrade lodash from 4.17.19 to 4.17.20 (#11095)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d488aaf41716ea5959fdeb655d9754fb885ed69a
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Wed Sep 23 10:06:28 2020 +0100

    Security upgrade lodash from 4.17.19 to 4.17.20 (#11095)
    
    Details: https://snyk.io/vuln/SNYK-JS-LODASH-590103
---
 airflow/www_rbac/package.json | 4 ++--
 airflow/www_rbac/yarn.lock    | 5 +++++
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/airflow/www_rbac/package.json b/airflow/www_rbac/package.json
index 7087a42..4141afa 100644
--- a/airflow/www_rbac/package.json
+++ b/airflow/www_rbac/package.json
@@ -47,7 +47,7 @@
     "imports-loader": "^1.1.0",
     "jquery": ">=3.4.0",
     "js-yaml": "^3.14.0",
-    "lodash": "^4.17.19",
+    "lodash": "^4.17.20",
     "mini-css-extract-plugin": "0.9.0",
     "moment-locales-webpack-plugin": "^1.2.0",
     "style-loader": "^1.2.1",
@@ -69,7 +69,7 @@
     "eonasdan-bootstrap-datetimepicker": "^4.17.47",
     "jquery": ">=3.4.0",
     "js-yaml": "^3.14.0",
-    "lodash": "^4.17.19",
+    "lodash": "^4.17.20",
     "moment-timezone": "^0.5.28",
     "nvd3": "^1.8.6",
     "url-search-params-polyfill": "^8.1.0"
diff --git a/airflow/www_rbac/yarn.lock b/airflow/www_rbac/yarn.lock
index e247a38..4550616 100644
--- a/airflow/www_rbac/yarn.lock
+++ b/airflow/www_rbac/yarn.lock
@@ -3818,6 +3818,11 @@ lodash@^4.17.13, lodash@^4.17.19:
   resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.19.tgz#e48ddedbe30b3321783c5b4301fbd353bc1e4a4b"
   integrity sha512-JNvd8XER9GQX0v2qJgsaN/mzFCNA5BRe/j8JN9d+tWyGLSodKQHKFicdwNYzWwI3wjRnaKPsGj1XkBjx/F96DQ==
 
+lodash@^4.17.20:
+  version "4.17.20"
+  resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.20.tgz#b44a9b6297bcb698f1c51a3545a2b3b368d59c52"
+  integrity sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA==
+
 log-symbols@^2.2.0:
   version "2.2.0"
   resolved "https://registry.yarnpkg.com/log-symbols/-/log-symbols-2.2.0.tgz#5740e1c5d6f0dfda4ad9323b5332107ef6b4c40a"


[airflow] 03/07: Log instead of raise an Error for unregistered OperatorLinks (#11959)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 97396c72d592ae61d1c3cec38004a633d60f977a
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Oct 30 10:39:31 2020 +0000

    Log instead of raise an Error for unregistered OperatorLinks (#11959)
    
    Currently, if someone uses OperatorLinks that are not registered,
    it will break the UI when someone clicks on that DAG.
    
    This commit will instead log an error in the Webserver logs so that
    someone can still see the DAG in different Views (graph, tree, etc).
    
    (cherry picked from commit 44f6e6fca59596a5cdf27ca0910e86a9d8150a63)
---
 airflow/serialization/serialized_objects.py   |  3 ++-
 tests/serialization/test_dag_serialization.py | 36 ++++++++++++++++++++++++++-
 2 files changed, 37 insertions(+), 2 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 6d1c30a..857514e 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -495,7 +495,8 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
             elif _operator_link_class_path in registered_operator_link_classes:
                 single_op_link_class = registered_operator_link_classes[_operator_link_class_path]
             else:
-                raise KeyError("Operator Link class %r not registered" % _operator_link_class_path)
+                log.error("Operator Link class %r not registered", _operator_link_class_path)
+                return {}
 
             op_predefined_extra_link = cattr.structure(
                 data, single_op_link_class)    # type: BaseOperatorLink
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index a0447b8..80bbe7a 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -33,7 +33,7 @@ from dateutil.relativedelta import relativedelta, FR
 
 from airflow.hooks.base_hook import BaseHook
 from airflow.models import DAG, Connection, DagBag, TaskInstance
-from airflow.models.baseoperator import BaseOperator
+from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
 from airflow.operators.bash_operator import BashOperator
 from airflow.serialization.json_schema import load_dag_schema_dict
 from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG
@@ -568,6 +568,40 @@ class TestStringifiedDAGs(unittest.TestCase):
         google_link_from_plugin = simple_task.get_extra_links(test_date, GoogleLink.name)
         self.assertEqual("https://www.google.com", google_link_from_plugin)
 
+    def test_extra_operator_links_logs_error_for_non_registered_extra_links(self):
+        """
+        Assert OperatorLinks not registered via Plugins and if it is not an inbuilt Operator Link,
+        it can still deserialize the DAG (does not error) but just logs an error
+        """
+
+        class TaskStateLink(BaseOperatorLink):
+            """OperatorLink not registered via Plugins nor a built-in OperatorLink"""
+            name = 'My Link'
+
+            def get_link(self, operator, dttm):
+                return 'https://www.google.com'
+
+        class MyOperator(BaseOperator):
+            """Just a DummyOperator using above defined Extra Operator Link"""
+            operator_extra_links = [TaskStateLink()]
+
+            def execute(self, context):
+                pass
+
+        with DAG(dag_id='simple_dag', start_date=datetime(2019, 8, 1)) as dag:
+            MyOperator(task_id='blah')
+
+        serialized_dag = SerializedDAG.to_dict(dag)
+
+        with self.assertLogs("airflow.serialization.serialized_objects", level="ERROR") as log_output:
+            SerializedDAG.from_dict(serialized_dag)
+            received_logs = log_output.output[0]
+            expected_err_msg = (
+                "Operator Link class 'tests.serialization.test_dag_serialization.TaskStateLink' "
+                "not registered"
+            )
+            assert expected_err_msg in received_logs
+
     def test_extra_serialized_field_and_multiple_operator_links(self):
         """
         Assert extra field exists & OperatorLinks defined in Plugins and inbuilt Operator Links.