You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/20 03:02:42 UTC

[airflow] branch v1-10-test updated (4d8c4e2 -> f2ee8e8)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 4d8c4e2  remove core_to_contrib
 discard 5091334  remove core_to_contrib
 discard f56eeab  remote core_to_contrib
 discard 0fa10db  alembic fix
 discard a873e85  ASf
    omit eb15571  flake8 pass Merging multiple sql operators (#9124)
     new 212a4c8  flake8 pass Merging multiple sql operators (#9124)
     new 2208485  alembic fix
     new db795a4  remove core_to_contrib
     new f2ee8e8  flake8 tests

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (4d8c4e2)
            \
             N -- N -- N   refs/heads/v1-10-test (f2ee8e8)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/operators/sql.py | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)


[airflow] 03/04: remove core_to_contrib

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit db795a4e43778dd2f18ccfa3ab054e9a2ad823f9
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Fri Jun 19 19:35:47 2020 -0700

    remove core_to_contrib
---
 airflow/operators/sql.py      |  16 +++---
 tests/test_core_to_contrib.py | 129 ------------------------------------------
 2 files changed, 8 insertions(+), 137 deletions(-)

diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index 83cb201..91ddc1a 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -154,14 +154,14 @@ class SQLValueCheckOperator(BaseOperator):
 
     @apply_defaults
     def __init__(
-        self,
-        sql,
-        pass_value,
-        tolerance=None,
-        conn_id=None,
-        *args,
-        **kwargs
-    ):
+            self,
+            sql,
+            pass_value,
+            tolerance=None,
+            conn_id=None,
+            *args,
+            **kwargs
+            ):
         super(SQLValueCheckOperator, self).__init__(*args, **kwargs)
         self.sql = sql
         self.conn_id = conn_id
diff --git a/tests/test_core_to_contrib.py b/tests/test_core_to_contrib.py
deleted file mode 100644
index 127905a..0000000
--- a/tests/test_core_to_contrib.py
+++ /dev/null
@@ -1,129 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import importlib
-import sys
-from inspect import isabstract
-from unittest import TestCase, mock
-
-from parameterized import parameterized
-
-
-OPERATORS = [
-    (
-        'airflow.operators.check_operator.CheckOperator',
-        'airflow.operators.sql.SQLCheckOperator',
-    ),
-    (
-        'airflow.operators.check_operator.IntervalCheckOperator',
-        'airflow.operators.sql.SQLIntervalCheckOperator',
-    ),
-    (
-        'airflow.operators.check_operator.ValueCheckOperator',
-        'airflow.operators.sql.SQLValueCheckOperator',
-    ),
-    (
-        'airflow.operators.check_operator.ThresholdCheckOperator',
-        'airflow.operators.sql.SQLThresholdCheckOperator',
-    ),
-    (
-        'airflow.operators.sql_branch_operator.BranchSqlOperator',
-        'airflow.operators.sql.BranchSQLOperator',
-    ),
-]
-
-
-ALL = OPERATORS
-
-RENAMED_HOOKS = [
-    (old_class, new_class)
-    for old_class, new_class in OPERATORS
-    if old_class.rpartition(".")[2] != new_class.rpartition(".")[2]
-]
-
-
-class TestMovingCoreToContrib(TestCase):
-    @staticmethod
-    def assert_warning(msg, warning):
-        error = "Text '{}' not in warnings".format(msg)
-        assert any(msg in str(w) for w in warning.warnings), error
-
-    def assert_is_subclass(self, clazz, other):
-        self.assertTrue(
-            issubclass(clazz, other), "{} is not subclass of {}".format(clazz, other)
-        )
-
-    def assert_proper_import(self, old_resource, new_resource):
-        new_path, _, _ = new_resource.rpartition(".")
-        old_path, _, _ = old_resource.rpartition(".")
-        with self.assertWarns(DeprecationWarning) as warning_msg:
-            # Reload to see deprecation warning each time
-            importlib.reload(importlib.import_module(old_path))
-            self.assert_warning(new_path, warning_msg)
-
-    def skip_test_with_mssql_in_py38(self, path_a="", path_b=""):
-        py_38 = sys.version_info >= (3, 8)
-        if py_38:
-            if "mssql" in path_a or "mssql" in path_b:
-                raise self.skipTest("Mssql package not avaible when Python >= 3.8.")
-
-    @staticmethod
-    def get_class_from_path(path_to_class, parent=False):
-        """
-        :param parent indicates if "path_to_class" arg is super class
-        """
-
-        path, _, class_name = path_to_class.rpartition(".")
-        module = importlib.import_module(path)
-        class_ = getattr(module, class_name)
-
-        if isabstract(class_) and not parent:
-            class_name = "Mock({class_name})".format(class_name=class_.__name__)
-
-            attributes = {
-                a: mock.MagicMock() for a in class_.__abstractmethods__
-            }
-
-            new_class = type(class_name, (class_,), attributes)
-            return new_class
-        return class_
-
-    @parameterized.expand(RENAMED_HOOKS)
-    def test_is_class_deprecated(self, new_module, old_module):
-        self.skip_test_with_mssql_in_py38(new_module, old_module)
-        deprecation_warning_msg = "This class is deprecated."
-        old_module_class = self.get_class_from_path(old_module)
-        with self.assertWarnsRegex(DeprecationWarning, deprecation_warning_msg) as wrn:
-            with mock.patch("{}.__init__".format(new_module)) as init_mock:
-                init_mock.return_value = None
-                self.assertTrue(deprecation_warning_msg, wrn)
-                old_module_class()
-                init_mock.assert_called_once_with()
-
-    @parameterized.expand(ALL)
-    def test_is_subclass(self, parent_class_path, sub_class_path):
-        self.skip_test_with_mssql_in_py38(parent_class_path, sub_class_path)
-        with mock.patch("{}.__init__".format(parent_class_path)):
-            parent_class_path = self.get_class_from_path(parent_class_path, parent=True)
-            sub_class_path = self.get_class_from_path(sub_class_path)
-            self.assert_is_subclass(sub_class_path, parent_class_path)
-
-    @parameterized.expand(ALL)
-    def test_warning_on_import(self, new_path, old_path):
-        self.skip_test_with_mssql_in_py38(new_path, old_path)
-        self.assert_proper_import(old_path, new_path)


[airflow] 02/04: alembic fix

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 220848589b193c1a4f8216ae83e78172e53a8b81
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Fri Jun 19 17:27:27 2020 -0700

    alembic fix
---
 .../versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
index ecb589d..59098a8 100644
--- a/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
+++ b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
@@ -29,7 +29,7 @@ from sqlalchemy.dialects import mysql
 
 # revision identifiers, used by Alembic.
 revision = 'a66efa278eea'
-down_revision = '8f966b9c467a'
+down_revision = '952da73b5eff'
 branch_labels = None
 depends_on = None
 


[airflow] 04/04: flake8 tests

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f2ee8e868ff969860d3ba5c47f4f1c04808543ca
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Fri Jun 19 20:01:25 2020 -0700

    flake8 tests
---
 airflow/operators/sql.py | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index 91ddc1a..83cb201 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -154,14 +154,14 @@ class SQLValueCheckOperator(BaseOperator):
 
     @apply_defaults
     def __init__(
-            self,
-            sql,
-            pass_value,
-            tolerance=None,
-            conn_id=None,
-            *args,
-            **kwargs
-            ):
+        self,
+        sql,
+        pass_value,
+        tolerance=None,
+        conn_id=None,
+        *args,
+        **kwargs
+    ):
         super(SQLValueCheckOperator, self).__init__(*args, **kwargs)
         self.sql = sql
         self.conn_id = conn_id


[airflow] 01/04: flake8 pass Merging multiple sql operators (#9124)

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 212a4c8c80ca8fdb0e451a54d8665802406fa74a
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Fri Jun 19 15:15:50 2020 -0700

    flake8 pass Merging multiple sql operators (#9124)
---
 airflow/operators/check_operator.py      |  8 +--
 airflow/operators/sql.py                 | 93 +++++++++++++++++---------------
 airflow/operators/sql_branch_operator.py |  2 +-
 tests/operators/test_check_operator.py   |  0
 tests/operators/test_sql.py              |  5 +-
 tests/test_core_to_contrib.py            | 15 ++----
 6 files changed, 61 insertions(+), 62 deletions(-)

diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index 4810eeb..12ac472 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -38,7 +38,7 @@ class CheckOperator(SQLCheckOperator):
             Please use `airflow.operators.sql.SQLCheckOperator`.""",
             DeprecationWarning, stacklevel=2
         )
-        super().__init__(*args, **kwargs)
+        super(CheckOperator, self).__init__(*args, **kwargs)
 
 
 class IntervalCheckOperator(SQLIntervalCheckOperator):
@@ -53,7 +53,7 @@ class IntervalCheckOperator(SQLIntervalCheckOperator):
             Please use `airflow.operators.sql.SQLIntervalCheckOperator`.""",
             DeprecationWarning, stacklevel=2
         )
-        super().__init__(*args, **kwargs)
+        super(IntervalCheckOperator, self).__init__(*args, **kwargs)
 
 
 class ThresholdCheckOperator(SQLThresholdCheckOperator):
@@ -68,7 +68,7 @@ class ThresholdCheckOperator(SQLThresholdCheckOperator):
             Please use `airflow.operators.sql.SQLThresholdCheckOperator`.""",
             DeprecationWarning, stacklevel=2
         )
-        super().__init__(*args, **kwargs)
+        super(ThresholdCheckOperator, self).__init__(*args, **kwargs)
 
 
 class ValueCheckOperator(SQLValueCheckOperator):
@@ -83,4 +83,4 @@ class ValueCheckOperator(SQLValueCheckOperator):
             Please use `airflow.operators.sql.SQLValueCheckOperator`.""",
             DeprecationWarning, stacklevel=2
         )
-        super().__init__(*args, **kwargs)
+        super(ValueCheckOperator, self).__init__(*args, **kwargs)
diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index fd997d9..83cb201 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 from distutils.util import strtobool
-from typing import Any, Dict, Iterable, List, Mapping, Optional, SupportsAbs, Union
+from typing import Iterable
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
@@ -82,9 +82,9 @@ class SQLCheckOperator(BaseOperator):
 
     @apply_defaults
     def __init__(
-        self, sql: str, conn_id: Optional[str] = None, *args, **kwargs
-    ) -> None:
-        super().__init__(*args, **kwargs)
+        self, sql, conn_id=None, *args, **kwargs
+    ):
+        super(SQLCheckOperator, self).__init__(*args, **kwargs)
         self.conn_id = conn_id
         self.sql = sql
 
@@ -155,14 +155,14 @@ class SQLValueCheckOperator(BaseOperator):
     @apply_defaults
     def __init__(
         self,
-        sql: str,
-        pass_value: Any,
-        tolerance: Any = None,
-        conn_id: Optional[str] = None,
+        sql,
+        pass_value,
+        tolerance=None,
+        conn_id=None,
         *args,
-        **kwargs,
+        **kwargs
     ):
-        super().__init__(*args, **kwargs)
+        super(SQLValueCheckOperator, self).__init__(*args, **kwargs)
         self.sql = sql
         self.conn_id = conn_id
         self.pass_value = str(pass_value)
@@ -278,17 +278,17 @@ class SQLIntervalCheckOperator(BaseOperator):
     @apply_defaults
     def __init__(
         self,
-        table: str,
-        metrics_thresholds: Dict[str, int],
-        date_filter_column: Optional[str] = "ds",
-        days_back: SupportsAbs[int] = -7,
-        ratio_formula: Optional[str] = "max_over_min",
-        ignore_zero: Optional[bool] = True,
-        conn_id: Optional[str] = None,
+        table,
+        metrics_thresholds,
+        date_filter_column="ds",
+        days_back=-7,
+        ratio_formula="max_over_min",
+        ignore_zero=True,
+        conn_id=None,
         *args,
-        **kwargs,
+        **kwargs
     ):
-        super().__init__(*args, **kwargs)
+        super(SQLIntervalCheckOperator, self).__init__(*args, **kwargs)
         if ratio_formula not in self.ratio_formulas:
             msg_template = (
                 "Invalid diff_method: {diff_method}. "
@@ -423,14 +423,14 @@ class SQLThresholdCheckOperator(BaseOperator):
     @apply_defaults
     def __init__(
         self,
-        sql: str,
-        min_threshold: Any,
-        max_threshold: Any,
-        conn_id: Optional[str] = None,
+        sql,
+        min_threshold,
+        max_threshold,
+        conn_id=None,
         *args,
-        **kwargs,
+        **kwargs
     ):
-        super().__init__(*args, **kwargs)
+        super(SQLThresholdCheckOperator, self).__init__(*args, **kwargs)
         self.sql = sql
         self.conn_id = conn_id
         self.min_threshold = _convert_to_float_if_possible(min_threshold)
@@ -461,13 +461,20 @@ class SQLThresholdCheckOperator(BaseOperator):
         self.push(meta_data)
         if not meta_data["within_threshold"]:
             error_msg = (
-                f'Threshold Check: "{meta_data.get("task_id")}" failed.\n'
-                f'DAG: {self.dag_id}\nTask_id: {meta_data.get("task_id")}\n'
-                f'Check description: {meta_data.get("description")}\n'
-                f"SQL: {self.sql}\n"
-                f'Result: {round(meta_data.get("result"), 2)} is not within thresholds '
-                f'{meta_data.get("min_threshold")} and {meta_data.get("max_threshold")}'
-            )
+                'Threshold Check: "{task_id}" failed.\n'
+                'DAG: {dag_id}\nTask_id: {task_id}\n'
+                'Check description: {description}\n'
+                "SQL: {sql}\n"
+                'Result: {round} is not within thresholds '
+                '{min} and {max}'
+                .format(task_id=meta_data.get("task_id"),
+                        dag_id=self.dag_id,
+                        description=meta_data.get("description"),
+                        sql=self.sql,
+                        round=round(meta_data.get("result"), 2),
+                        min=meta_data.get("min_threshold"),
+                        max=meta_data.get("max_threshold"),
+                        ))
             raise AirflowException(error_msg)
 
         self.log.info("Test %s Successful.", self.task_id)
@@ -478,7 +485,7 @@ class SQLThresholdCheckOperator(BaseOperator):
         Default functionality will log metadata.
         """
 
-        info = "\n".join([f"""{key}: {item}""" for key, item in meta_data.items()])
+        info = "\n".join(["{key}: {item}".format(key=key, item=item) for key, item in meta_data.items()])
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
     def get_db_hook(self):
@@ -516,16 +523,16 @@ class BranchSQLOperator(BaseOperator, SkipMixin):
     @apply_defaults
     def __init__(
         self,
-        sql: str,
-        follow_task_ids_if_true: List[str],
-        follow_task_ids_if_false: List[str],
-        conn_id: str = "default_conn_id",
-        database: Optional[str] = None,
-        parameters: Optional[Union[Mapping, Iterable]] = None,
+        sql,
+        follow_task_ids_if_true,
+        follow_task_ids_if_false,
+        conn_id="default_conn_id",
+        database=None,
+        parameters=None,
         *args,
-        **kwargs,
-    ) -> None:
-        super().__init__(*args, **kwargs)
+        **kwargs
+    ):
+        super(BranchSQLOperator, self).__init__(*args, **kwargs)
         self.conn_id = conn_id
         self.sql = sql
         self.parameters = parameters
@@ -551,7 +558,7 @@ class BranchSQLOperator(BaseOperator, SkipMixin):
 
         return self._hook
 
-    def execute(self, context: Dict):
+    def execute(self, context):
         # get supported hook
         self._hook = self._get_hook()
 
diff --git a/airflow/operators/sql_branch_operator.py b/airflow/operators/sql_branch_operator.py
index cd319aa..b911e34 100644
--- a/airflow/operators/sql_branch_operator.py
+++ b/airflow/operators/sql_branch_operator.py
@@ -32,4 +32,4 @@ class BranchSqlOperator(BranchSQLOperator):
             Please use `airflow.operators.sql.BranchSQLOperator`.""",
             DeprecationWarning, stacklevel=2
         )
-        super().__init__(*args, **kwargs)
+        super(BranchSqlOperator, self).__init__(*args, **kwargs)
diff --git a/tests/operators/test_check_operator.py b/tests/operators/test_check_operator.py
deleted file mode 100644
index e69de29..0000000
diff --git a/tests/operators/test_sql.py b/tests/operators/test_sql.py
index a538f15..e5c1f98 100644
--- a/tests/operators/test_sql.py
+++ b/tests/operators/test_sql.py
@@ -200,8 +200,7 @@ class TestIntervalCheckOperator(unittest.TestCase):
                 [2, 2, 2, 2],  # reference
                 [1, 1, 1, 1],  # current
             ]
-
-            yield from rows
+            return rows
 
         mock_hook.get_first.side_effect = returned_row()
         mock_get_db_hook.return_value = mock_hook
@@ -226,7 +225,7 @@ class TestIntervalCheckOperator(unittest.TestCase):
                 [1, 1, 1, 1],  # current
             ]
 
-            yield from rows
+            return rows
 
         mock_hook.get_first.side_effect = returned_row()
         mock_get_db_hook.return_value = mock_hook
diff --git a/tests/test_core_to_contrib.py b/tests/test_core_to_contrib.py
index 0a3e7fb..127905a 100644
--- a/tests/test_core_to_contrib.py
+++ b/tests/test_core_to_contrib.py
@@ -19,12 +19,10 @@
 import importlib
 import sys
 from inspect import isabstract
-from typing import Any
 from unittest import TestCase, mock
 
 from parameterized import parameterized
 
-HOOKS = []
 
 OPERATORS = [
     (
@@ -49,24 +47,19 @@ OPERATORS = [
     ),
 ]
 
-SECRETS = []
 
-SENSORS = []
-
-TRANSFERS = []
-
-ALL = HOOKS + OPERATORS + SECRETS + SENSORS + TRANSFERS
+ALL = OPERATORS
 
 RENAMED_HOOKS = [
     (old_class, new_class)
-    for old_class, new_class in HOOKS + OPERATORS + SECRETS + SENSORS
+    for old_class, new_class in OPERATORS
     if old_class.rpartition(".")[2] != new_class.rpartition(".")[2]
 ]
 
 
 class TestMovingCoreToContrib(TestCase):
     @staticmethod
-    def assert_warning(msg: str, warning: Any):
+    def assert_warning(msg, warning):
         error = "Text '{}' not in warnings".format(msg)
         assert any(msg in str(w) for w in warning.warnings), error
 
@@ -100,7 +93,7 @@ class TestMovingCoreToContrib(TestCase):
         class_ = getattr(module, class_name)
 
         if isabstract(class_) and not parent:
-            class_name = f"Mock({class_.__name__})"
+            class_name = "Mock({class_name})".format(class_name=class_.__name__)
 
             attributes = {
                 a: mock.MagicMock() for a in class_.__abstractmethods__