You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/20 03:02:42 UTC
[airflow] branch v1-10-test updated (4d8c4e2 -> f2ee8e8)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.
discard 4d8c4e2 remove core_to_contrib
discard 5091334 remove core_to_contrib
discard f56eeab remote core_to_contrib
discard 0fa10db alembic fix
discard a873e85 ASf
omit eb15571 flake8 pass Merging multiple sql operators (#9124)
new 212a4c8 flake8 pass Merging multiple sql operators (#9124)
new 2208485 alembic fix
new db795a4 remove core_to_contrib
new f2ee8e8 flake8 tests
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (4d8c4e2)
\
N -- N -- N refs/heads/v1-10-test (f2ee8e8)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
airflow/operators/sql.py | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
[airflow] 03/04: remove core_to_contrib
Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit db795a4e43778dd2f18ccfa3ab054e9a2ad823f9
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Fri Jun 19 19:35:47 2020 -0700
remove core_to_contrib
---
airflow/operators/sql.py | 16 +++---
tests/test_core_to_contrib.py | 129 ------------------------------------------
2 files changed, 8 insertions(+), 137 deletions(-)
diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index 83cb201..91ddc1a 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -154,14 +154,14 @@ class SQLValueCheckOperator(BaseOperator):
@apply_defaults
def __init__(
- self,
- sql,
- pass_value,
- tolerance=None,
- conn_id=None,
- *args,
- **kwargs
- ):
+ self,
+ sql,
+ pass_value,
+ tolerance=None,
+ conn_id=None,
+ *args,
+ **kwargs
+ ):
super(SQLValueCheckOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.conn_id = conn_id
diff --git a/tests/test_core_to_contrib.py b/tests/test_core_to_contrib.py
deleted file mode 100644
index 127905a..0000000
--- a/tests/test_core_to_contrib.py
+++ /dev/null
@@ -1,129 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import importlib
-import sys
-from inspect import isabstract
-from unittest import TestCase, mock
-
-from parameterized import parameterized
-
-
-OPERATORS = [
- (
- 'airflow.operators.check_operator.CheckOperator',
- 'airflow.operators.sql.SQLCheckOperator',
- ),
- (
- 'airflow.operators.check_operator.IntervalCheckOperator',
- 'airflow.operators.sql.SQLIntervalCheckOperator',
- ),
- (
- 'airflow.operators.check_operator.ValueCheckOperator',
- 'airflow.operators.sql.SQLValueCheckOperator',
- ),
- (
- 'airflow.operators.check_operator.ThresholdCheckOperator',
- 'airflow.operators.sql.SQLThresholdCheckOperator',
- ),
- (
- 'airflow.operators.sql_branch_operator.BranchSqlOperator',
- 'airflow.operators.sql.BranchSQLOperator',
- ),
-]
-
-
-ALL = OPERATORS
-
-RENAMED_HOOKS = [
- (old_class, new_class)
- for old_class, new_class in OPERATORS
- if old_class.rpartition(".")[2] != new_class.rpartition(".")[2]
-]
-
-
-class TestMovingCoreToContrib(TestCase):
- @staticmethod
- def assert_warning(msg, warning):
- error = "Text '{}' not in warnings".format(msg)
- assert any(msg in str(w) for w in warning.warnings), error
-
- def assert_is_subclass(self, clazz, other):
- self.assertTrue(
- issubclass(clazz, other), "{} is not subclass of {}".format(clazz, other)
- )
-
- def assert_proper_import(self, old_resource, new_resource):
- new_path, _, _ = new_resource.rpartition(".")
- old_path, _, _ = old_resource.rpartition(".")
- with self.assertWarns(DeprecationWarning) as warning_msg:
- # Reload to see deprecation warning each time
- importlib.reload(importlib.import_module(old_path))
- self.assert_warning(new_path, warning_msg)
-
- def skip_test_with_mssql_in_py38(self, path_a="", path_b=""):
- py_38 = sys.version_info >= (3, 8)
- if py_38:
- if "mssql" in path_a or "mssql" in path_b:
- raise self.skipTest("Mssql package not avaible when Python >= 3.8.")
-
- @staticmethod
- def get_class_from_path(path_to_class, parent=False):
- """
- :param parent indicates if "path_to_class" arg is super class
- """
-
- path, _, class_name = path_to_class.rpartition(".")
- module = importlib.import_module(path)
- class_ = getattr(module, class_name)
-
- if isabstract(class_) and not parent:
- class_name = "Mock({class_name})".format(class_name=class_.__name__)
-
- attributes = {
- a: mock.MagicMock() for a in class_.__abstractmethods__
- }
-
- new_class = type(class_name, (class_,), attributes)
- return new_class
- return class_
-
- @parameterized.expand(RENAMED_HOOKS)
- def test_is_class_deprecated(self, new_module, old_module):
- self.skip_test_with_mssql_in_py38(new_module, old_module)
- deprecation_warning_msg = "This class is deprecated."
- old_module_class = self.get_class_from_path(old_module)
- with self.assertWarnsRegex(DeprecationWarning, deprecation_warning_msg) as wrn:
- with mock.patch("{}.__init__".format(new_module)) as init_mock:
- init_mock.return_value = None
- self.assertTrue(deprecation_warning_msg, wrn)
- old_module_class()
- init_mock.assert_called_once_with()
-
- @parameterized.expand(ALL)
- def test_is_subclass(self, parent_class_path, sub_class_path):
- self.skip_test_with_mssql_in_py38(parent_class_path, sub_class_path)
- with mock.patch("{}.__init__".format(parent_class_path)):
- parent_class_path = self.get_class_from_path(parent_class_path, parent=True)
- sub_class_path = self.get_class_from_path(sub_class_path)
- self.assert_is_subclass(sub_class_path, parent_class_path)
-
- @parameterized.expand(ALL)
- def test_warning_on_import(self, new_path, old_path):
- self.skip_test_with_mssql_in_py38(new_path, old_path)
- self.assert_proper_import(old_path, new_path)
[airflow] 02/04: alembic fix
Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 220848589b193c1a4f8216ae83e78172e53a8b81
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Fri Jun 19 17:27:27 2020 -0700
alembic fix
---
.../versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
index ecb589d..59098a8 100644
--- a/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
+++ b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py
@@ -29,7 +29,7 @@ from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = 'a66efa278eea'
-down_revision = '8f966b9c467a'
+down_revision = '952da73b5eff'
branch_labels = None
depends_on = None
[airflow] 04/04: flake8 tests
Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f2ee8e868ff969860d3ba5c47f4f1c04808543ca
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Fri Jun 19 20:01:25 2020 -0700
flake8 tests
---
airflow/operators/sql.py | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index 91ddc1a..83cb201 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -154,14 +154,14 @@ class SQLValueCheckOperator(BaseOperator):
@apply_defaults
def __init__(
- self,
- sql,
- pass_value,
- tolerance=None,
- conn_id=None,
- *args,
- **kwargs
- ):
+ self,
+ sql,
+ pass_value,
+ tolerance=None,
+ conn_id=None,
+ *args,
+ **kwargs
+ ):
super(SQLValueCheckOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.conn_id = conn_id
[airflow] 01/04: flake8 pass Merging multiple sql operators (#9124)
Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 212a4c8c80ca8fdb0e451a54d8665802406fa74a
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Fri Jun 19 15:15:50 2020 -0700
flake8 pass Merging multiple sql operators (#9124)
---
airflow/operators/check_operator.py | 8 +--
airflow/operators/sql.py | 93 +++++++++++++++++---------------
airflow/operators/sql_branch_operator.py | 2 +-
tests/operators/test_check_operator.py | 0
tests/operators/test_sql.py | 5 +-
tests/test_core_to_contrib.py | 15 ++----
6 files changed, 61 insertions(+), 62 deletions(-)
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index 4810eeb..12ac472 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -38,7 +38,7 @@ class CheckOperator(SQLCheckOperator):
Please use `airflow.operators.sql.SQLCheckOperator`.""",
DeprecationWarning, stacklevel=2
)
- super().__init__(*args, **kwargs)
+ super(CheckOperator, self).__init__(*args, **kwargs)
class IntervalCheckOperator(SQLIntervalCheckOperator):
@@ -53,7 +53,7 @@ class IntervalCheckOperator(SQLIntervalCheckOperator):
Please use `airflow.operators.sql.SQLIntervalCheckOperator`.""",
DeprecationWarning, stacklevel=2
)
- super().__init__(*args, **kwargs)
+ super(IntervalCheckOperator, self).__init__(*args, **kwargs)
class ThresholdCheckOperator(SQLThresholdCheckOperator):
@@ -68,7 +68,7 @@ class ThresholdCheckOperator(SQLThresholdCheckOperator):
Please use `airflow.operators.sql.SQLThresholdCheckOperator`.""",
DeprecationWarning, stacklevel=2
)
- super().__init__(*args, **kwargs)
+ super(ThresholdCheckOperator, self).__init__(*args, **kwargs)
class ValueCheckOperator(SQLValueCheckOperator):
@@ -83,4 +83,4 @@ class ValueCheckOperator(SQLValueCheckOperator):
Please use `airflow.operators.sql.SQLValueCheckOperator`.""",
DeprecationWarning, stacklevel=2
)
- super().__init__(*args, **kwargs)
+ super(ValueCheckOperator, self).__init__(*args, **kwargs)
diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index fd997d9..83cb201 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
from distutils.util import strtobool
-from typing import Any, Dict, Iterable, List, Mapping, Optional, SupportsAbs, Union
+from typing import Iterable
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
@@ -82,9 +82,9 @@ class SQLCheckOperator(BaseOperator):
@apply_defaults
def __init__(
- self, sql: str, conn_id: Optional[str] = None, *args, **kwargs
- ) -> None:
- super().__init__(*args, **kwargs)
+ self, sql, conn_id=None, *args, **kwargs
+ ):
+ super(SQLCheckOperator, self).__init__(*args, **kwargs)
self.conn_id = conn_id
self.sql = sql
@@ -155,14 +155,14 @@ class SQLValueCheckOperator(BaseOperator):
@apply_defaults
def __init__(
self,
- sql: str,
- pass_value: Any,
- tolerance: Any = None,
- conn_id: Optional[str] = None,
+ sql,
+ pass_value,
+ tolerance=None,
+ conn_id=None,
*args,
- **kwargs,
+ **kwargs
):
- super().__init__(*args, **kwargs)
+ super(SQLValueCheckOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.conn_id = conn_id
self.pass_value = str(pass_value)
@@ -278,17 +278,17 @@ class SQLIntervalCheckOperator(BaseOperator):
@apply_defaults
def __init__(
self,
- table: str,
- metrics_thresholds: Dict[str, int],
- date_filter_column: Optional[str] = "ds",
- days_back: SupportsAbs[int] = -7,
- ratio_formula: Optional[str] = "max_over_min",
- ignore_zero: Optional[bool] = True,
- conn_id: Optional[str] = None,
+ table,
+ metrics_thresholds,
+ date_filter_column="ds",
+ days_back=-7,
+ ratio_formula="max_over_min",
+ ignore_zero=True,
+ conn_id=None,
*args,
- **kwargs,
+ **kwargs
):
- super().__init__(*args, **kwargs)
+ super(SQLIntervalCheckOperator, self).__init__(*args, **kwargs)
if ratio_formula not in self.ratio_formulas:
msg_template = (
"Invalid diff_method: {diff_method}. "
@@ -423,14 +423,14 @@ class SQLThresholdCheckOperator(BaseOperator):
@apply_defaults
def __init__(
self,
- sql: str,
- min_threshold: Any,
- max_threshold: Any,
- conn_id: Optional[str] = None,
+ sql,
+ min_threshold,
+ max_threshold,
+ conn_id=None,
*args,
- **kwargs,
+ **kwargs
):
- super().__init__(*args, **kwargs)
+ super(SQLThresholdCheckOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.conn_id = conn_id
self.min_threshold = _convert_to_float_if_possible(min_threshold)
@@ -461,13 +461,20 @@ class SQLThresholdCheckOperator(BaseOperator):
self.push(meta_data)
if not meta_data["within_threshold"]:
error_msg = (
- f'Threshold Check: "{meta_data.get("task_id")}" failed.\n'
- f'DAG: {self.dag_id}\nTask_id: {meta_data.get("task_id")}\n'
- f'Check description: {meta_data.get("description")}\n'
- f"SQL: {self.sql}\n"
- f'Result: {round(meta_data.get("result"), 2)} is not within thresholds '
- f'{meta_data.get("min_threshold")} and {meta_data.get("max_threshold")}'
- )
+ 'Threshold Check: "{task_id}" failed.\n'
+ 'DAG: {dag_id}\nTask_id: {task_id}\n'
+ 'Check description: {description}\n'
+ "SQL: {sql}\n"
+ 'Result: {round} is not within thresholds '
+ '{min} and {max}'
+ .format(task_id=meta_data.get("task_id"),
+ dag_id=self.dag_id,
+ description=meta_data.get("description"),
+ sql=self.sql,
+ round=round(meta_data.get("result"), 2),
+ min=meta_data.get("min_threshold"),
+ max=meta_data.get("max_threshold"),
+ ))
raise AirflowException(error_msg)
self.log.info("Test %s Successful.", self.task_id)
@@ -478,7 +485,7 @@ class SQLThresholdCheckOperator(BaseOperator):
Default functionality will log metadata.
"""
- info = "\n".join([f"""{key}: {item}""" for key, item in meta_data.items()])
+ info = "\n".join(["{key}: {item}".format(key=key, item=item) for key, item in meta_data.items()])
self.log.info("Log from %s:\n%s", self.dag_id, info)
def get_db_hook(self):
@@ -516,16 +523,16 @@ class BranchSQLOperator(BaseOperator, SkipMixin):
@apply_defaults
def __init__(
self,
- sql: str,
- follow_task_ids_if_true: List[str],
- follow_task_ids_if_false: List[str],
- conn_id: str = "default_conn_id",
- database: Optional[str] = None,
- parameters: Optional[Union[Mapping, Iterable]] = None,
+ sql,
+ follow_task_ids_if_true,
+ follow_task_ids_if_false,
+ conn_id="default_conn_id",
+ database=None,
+ parameters=None,
*args,
- **kwargs,
- ) -> None:
- super().__init__(*args, **kwargs)
+ **kwargs
+ ):
+ super(BranchSQLOperator, self).__init__(*args, **kwargs)
self.conn_id = conn_id
self.sql = sql
self.parameters = parameters
@@ -551,7 +558,7 @@ class BranchSQLOperator(BaseOperator, SkipMixin):
return self._hook
- def execute(self, context: Dict):
+ def execute(self, context):
# get supported hook
self._hook = self._get_hook()
diff --git a/airflow/operators/sql_branch_operator.py b/airflow/operators/sql_branch_operator.py
index cd319aa..b911e34 100644
--- a/airflow/operators/sql_branch_operator.py
+++ b/airflow/operators/sql_branch_operator.py
@@ -32,4 +32,4 @@ class BranchSqlOperator(BranchSQLOperator):
Please use `airflow.operators.sql.BranchSQLOperator`.""",
DeprecationWarning, stacklevel=2
)
- super().__init__(*args, **kwargs)
+ super(BranchSqlOperator, self).__init__(*args, **kwargs)
diff --git a/tests/operators/test_check_operator.py b/tests/operators/test_check_operator.py
deleted file mode 100644
index e69de29..0000000
diff --git a/tests/operators/test_sql.py b/tests/operators/test_sql.py
index a538f15..e5c1f98 100644
--- a/tests/operators/test_sql.py
+++ b/tests/operators/test_sql.py
@@ -200,8 +200,7 @@ class TestIntervalCheckOperator(unittest.TestCase):
[2, 2, 2, 2], # reference
[1, 1, 1, 1], # current
]
-
- yield from rows
+ return rows
mock_hook.get_first.side_effect = returned_row()
mock_get_db_hook.return_value = mock_hook
@@ -226,7 +225,7 @@ class TestIntervalCheckOperator(unittest.TestCase):
[1, 1, 1, 1], # current
]
- yield from rows
+ return rows
mock_hook.get_first.side_effect = returned_row()
mock_get_db_hook.return_value = mock_hook
diff --git a/tests/test_core_to_contrib.py b/tests/test_core_to_contrib.py
index 0a3e7fb..127905a 100644
--- a/tests/test_core_to_contrib.py
+++ b/tests/test_core_to_contrib.py
@@ -19,12 +19,10 @@
import importlib
import sys
from inspect import isabstract
-from typing import Any
from unittest import TestCase, mock
from parameterized import parameterized
-HOOKS = []
OPERATORS = [
(
@@ -49,24 +47,19 @@ OPERATORS = [
),
]
-SECRETS = []
-SENSORS = []
-
-TRANSFERS = []
-
-ALL = HOOKS + OPERATORS + SECRETS + SENSORS + TRANSFERS
+ALL = OPERATORS
RENAMED_HOOKS = [
(old_class, new_class)
- for old_class, new_class in HOOKS + OPERATORS + SECRETS + SENSORS
+ for old_class, new_class in OPERATORS
if old_class.rpartition(".")[2] != new_class.rpartition(".")[2]
]
class TestMovingCoreToContrib(TestCase):
@staticmethod
- def assert_warning(msg: str, warning: Any):
+ def assert_warning(msg, warning):
error = "Text '{}' not in warnings".format(msg)
assert any(msg in str(w) for w in warning.warnings), error
@@ -100,7 +93,7 @@ class TestMovingCoreToContrib(TestCase):
class_ = getattr(module, class_name)
if isabstract(class_) and not parent:
- class_name = f"Mock({class_.__name__})"
+ class_name = "Mock({class_name})".format(class_name=class_.__name__)
attributes = {
a: mock.MagicMock() for a in class_.__abstractmethods__