You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/08/16 12:38:33 UTC

[airflow] branch v2-1-test updated: Improve validation of Group id (#17578)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-1-test by this push:
     new 38717bd  Improve validation of Group id (#17578)
38717bd is described below

commit 38717bd5843f54f7106365cb673900bf2f7613dc
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Fri Aug 13 17:33:31 2021 +0200

    Improve validation of Group id (#17578)
    
    When Group id of task group is used to prefix task id, it should
    follow the same limitation that task_id has, plus it should not
    have '.'. The '.' is used to separate groups in task id
    so it should not be allowed in the group id.
    
    If this is not checked at Task Group creation time, users will
    get messages about invalid task id during deserialization
    and it's not entirely obvoius where the error came from
    and it crashes the scheduler..
    
    Also this validation will be performed at parsing time, rather
    than at deserialization time and the DAG will not even get
    serialized, so it will not crash the scheduler.
    
    Fixes: #17568
    (cherry picked from commit 833e1094a72b5a09f6b2249001b977538f139a19)
---
 airflow/utils/helpers.py    | 22 ++++++++++---
 airflow/utils/task_group.py | 14 ++++++---
 tests/utils/test_helpers.py | 76 +++++++++++++++++++++++++++++++++++++++++++--
 3 files changed, 101 insertions(+), 11 deletions(-)

diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index f74a017..dd32d33 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -32,21 +32,33 @@ from airflow.exceptions import AirflowException
 from airflow.utils.module_loading import import_string
 
 KEY_REGEX = re.compile(r'^[\w.-]+$')
+GROUP_KEY_REGEX = re.compile(r'^[\w-]+$')
+CAMELCASE_TO_SNAKE_CASE_REGEX = re.compile(r'(?!^)([A-Z]+)')
 
 
 def validate_key(k, max_length=250):
     """Validates value used as a key."""
     if not isinstance(k, str):
-        raise TypeError("The key has to be a string")
-    elif len(k) > max_length:
+        raise TypeError(f"The key has to be a string and is {type(k)}:{k}")
+    if len(k) > max_length:
         raise AirflowException(f"The key has to be less than {max_length} characters")
-    elif not KEY_REGEX.match(k):
+    if not KEY_REGEX.match(k):
         raise AirflowException(
             "The key ({k}) has to be made of alphanumeric characters, dashes, "
             "dots and underscores exclusively".format(k=k)
         )
-    else:
-        return True
+
+
+def validate_group_key(k: str, max_length: int = 200):
+    """Validates value used as a group key."""
+    if not isinstance(k, str):
+        raise TypeError(f"The key has to be a string and is {type(k)}:{k}")
+    if len(k) > max_length:
+        raise AirflowException(f"The key has to be less than {max_length} characters")
+    if not GROUP_KEY_REGEX.match(k):
+        raise AirflowException(
+            f"The key ({k}) has to be made of alphanumeric characters, dashes " "and underscores exclusively"
+        )
 
 
 def alchemy_to_dict(obj: Any) -> Optional[Dict]:
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index 90afa07..36d8c7f 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -24,6 +24,7 @@ from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Sequence, Set
 
 from airflow.exceptions import AirflowException, DuplicateTaskIdFound
 from airflow.models.taskmixin import TaskMixin
+from airflow.utils.helpers import validate_group_key
 
 if TYPE_CHECKING:
     from airflow.models.baseoperator import BaseOperator
@@ -83,10 +84,15 @@ class TaskGroup(TaskMixin):
             self.used_group_ids: Set[Optional[str]] = set()
             self._parent_group = None
         else:
-            if not isinstance(group_id, str):
-                raise ValueError("group_id must be str")
-            if not group_id:
-                raise ValueError("group_id must not be empty")
+            if prefix_group_id:
+                # If group id is used as prefix, it should not contain spaces nor dots
+                # because it is used as prefix in the task_id
+                validate_group_key(group_id)
+            else:
+                if not isinstance(group_id, str):
+                    raise ValueError("group_id must be str")
+                if not group_id:
+                    raise ValueError("group_id must not be empty")
 
             dag = dag or DagContext.get_current_dag()
 
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index bba2d95..a2aa10b 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -15,17 +15,19 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import re
 import unittest
 from datetime import datetime
 
 import pytest
+from parameterized import parameterized
 
+from airflow import AirflowException
 from airflow.models import TaskInstance
 from airflow.models.dag import DAG
 from airflow.operators.dummy import DummyOperator
 from airflow.utils import helpers
-from airflow.utils.helpers import build_airflow_url_with_query, merge_dicts
+from airflow.utils.helpers import build_airflow_url_with_query, merge_dicts, validate_group_key, validate_key
 from tests.test_utils.config import conf_vars
 
 
@@ -154,3 +156,73 @@ class TestHelpers(unittest.TestCase):
 
         with cached_app(testing=True).test_request_context():
             assert build_airflow_url_with_query(query) == expected_url
+
+    @parameterized.expand(
+        [
+            (3, "The key has to be a string and is <class 'int'>:3", TypeError),
+            (None, "The key has to be a string and is <class 'NoneType'>:None", TypeError),
+            ("simple_key", None, None),
+            ("simple-key", None, None),
+            ("group.simple_key", None, None),
+            ("root.group.simple-key", None, None),
+            (
+                "key with space",
+                "The key (key with space) has to be made of alphanumeric "
+                "characters, dashes, dots and underscores exclusively",
+                AirflowException,
+            ),
+            (
+                "key_with_!",
+                "The key (key_with_!) has to be made of alphanumeric "
+                "characters, dashes, dots and underscores exclusively",
+                AirflowException,
+            ),
+            (' ' * 251, "The key has to be less than 250 characters", AirflowException),
+        ]
+    )
+    def test_validate_key(self, key_id, message, exception):
+        if message:
+            with pytest.raises(exception, match=re.escape(message)):
+                validate_key(key_id)
+        else:
+            validate_key(key_id)
+
+    @parameterized.expand(
+        [
+            (3, "The key has to be a string and is <class 'int'>:3", TypeError),
+            (None, "The key has to be a string and is <class 'NoneType'>:None", TypeError),
+            ("simple_key", None, None),
+            ("simple-key", None, None),
+            (
+                "group.simple_key",
+                "The key (group.simple_key) has to be made of alphanumeric "
+                "characters, dashes and underscores exclusively",
+                AirflowException,
+            ),
+            (
+                "root.group-name.simple_key",
+                "The key (root.group-name.simple_key) has to be made of alphanumeric "
+                "characters, dashes and underscores exclusively",
+                AirflowException,
+            ),
+            (
+                "key with space",
+                "The key (key with space) has to be made of alphanumeric "
+                "characters, dashes and underscores exclusively",
+                AirflowException,
+            ),
+            (
+                "key_with_!",
+                "The key (key_with_!) has to be made of alphanumeric "
+                "characters, dashes and underscores exclusively",
+                AirflowException,
+            ),
+            (' ' * 201, "The key has to be less than 200 characters", AirflowException),
+        ]
+    )
+    def test_validate_group_key(self, key_id, message, exception):
+        if message:
+            with pytest.raises(exception, match=re.escape(message)):
+                validate_group_key(key_id)
+        else:
+            validate_group_key(key_id)