You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/12 14:56:51 UTC

[airflow] branch v1-10-test updated (5ab6543 -> f41f9ec)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 5ab6543  Fixes name of pre-commit cache for multiple branches (#10299)
     new ff4baf6  Add Snowflake support to SQL operator and sensor (#9843)
     new 7061c83  Avoid sharing session with RenderedTaskInstanceFields write and delete (#9993)
     new ab5343d  [AIRFLOW-6931] Fixed migrations to find all dependencies for MSSQL (#9891)
     new f41f9ec  Fix regression in SQLThresholdCheckOperator (#9312)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../74effc47d867_change_datetime_to_datetime2_6_on_mssql_.py |  2 +-
 airflow/models/taskinstance.py                               |  4 ++--
 airflow/operators/sql.py                                     |  7 ++++---
 airflow/sensors/sql_sensor.py                                |  2 +-
 tests/operators/test_sql.py                                  | 12 ++++++------
 5 files changed, 14 insertions(+), 13 deletions(-)


[airflow] 01/04: Add Snowflake support to SQL operator and sensor (#9843)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ff4baf67a81982af0333b3788b0781e17a9d9149
Author: Andy <an...@gmail.com>
AuthorDate: Fri Jul 17 02:04:14 2020 -0500

    Add Snowflake support to SQL operator and sensor (#9843)
    
    * Add Snowflake support to SQL operator and sensor
    * Add test for conn_type to valid hook mapping
    * Improve code quality for conn type mapping test
    
    (cherry picked from commit 9c68e7cc6fc1bf7c5a9a0156a2f0cf166cf2dfbe)
---
 airflow/operators/sql.py      | 1 +
 airflow/sensors/sql_sensor.py | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index 1e5b090..57083d2 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -33,6 +33,7 @@ ALLOWED_CONN_TYPE = {
     "oracle",
     "postgres",
     "presto",
+    "snowflake",
     "sqlite",
     "vertica",
 }
diff --git a/airflow/sensors/sql_sensor.py b/airflow/sensors/sql_sensor.py
index 54b2982..47a375c 100644
--- a/airflow/sensors/sql_sensor.py
+++ b/airflow/sensors/sql_sensor.py
@@ -77,7 +77,7 @@ class SqlSensor(BaseSensorOperator):
 
         allowed_conn_type = {'google_cloud_platform', 'jdbc', 'mssql',
                              'mysql', 'oracle', 'postgres',
-                             'presto', 'sqlite', 'vertica'}
+                             'presto', 'snowflake', 'sqlite', 'vertica'}
         if conn.conn_type not in allowed_conn_type:
             raise AirflowException("The connection type is not supported by SqlSensor. " +
                                    "Supported connection types: {}".format(list(allowed_conn_type)))


[airflow] 02/04: Avoid sharing session with RenderedTaskInstanceFields write and delete (#9993)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7061c8316f99c6d91026acbd279812f2808870f4
Author: zikun <33...@users.noreply.github.com>
AuthorDate: Sun Jul 26 01:55:05 2020 +0800

    Avoid sharing session with RenderedTaskInstanceFields write and delete (#9993)
    
    Sharing session with RTIF leads to idle-in-transaction timeout error when DAG serialization is enabled and task running duration exceeds the idle-in-transaction timeout setting of the database.
    
    (cherry picked from commit ffcd0604e3f15e1b2a3ac1d8c97a35be7374cfab)
---
 airflow/models/taskinstance.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 0e5d6e3..ae296ba 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -964,8 +964,8 @@ class TaskInstance(Base, LoggingMixin):
 
                 self.render_templates(context=context)
                 if STORE_SERIALIZED_DAGS:
-                    RTIF.write(RTIF(ti=self, render_templates=False), session=session)
-                    RTIF.delete_old_records(self.task_id, self.dag_id, session=session)
+                    RTIF.write(RTIF(ti=self, render_templates=False))
+                    RTIF.delete_old_records(self.task_id, self.dag_id)
 
                 task_copy.pre_execute(context=context)
 


[airflow] 03/04: [AIRFLOW-6931] Fixed migrations to find all dependencies for MSSQL (#9891)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ab5343d939b0e37fe9aa5384debceb8cffcccec8
Author: BaoshanGu <Ba...@gmail.com>
AuthorDate: Tue Jul 21 19:11:39 2020 -0400

    [AIRFLOW-6931] Fixed migrations to find all dependencies for MSSQL (#9891)
    
    (cherry picked from commit 8a405d25fa869d8f3ab7f34b7ee9ca1d5909743b)
---
 .../versions/74effc47d867_change_datetime_to_datetime2_6_on_mssql_.py   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/migrations/versions/74effc47d867_change_datetime_to_datetime2_6_on_mssql_.py b/airflow/migrations/versions/74effc47d867_change_datetime_to_datetime2_6_on_mssql_.py
index a9ef785..c5630dd 100644
--- a/airflow/migrations/versions/74effc47d867_change_datetime_to_datetime2_6_on_mssql_.py
+++ b/airflow/migrations/versions/74effc47d867_change_datetime_to_datetime2_6_on_mssql_.py
@@ -231,7 +231,7 @@ def get_table_constraints(conn, table_name):
      FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
      JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
      WHERE tc.TABLE_NAME = '{table_name}' AND
-     (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or tc.CONSTRAINT_TYPE = 'Unique')
+     (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE')
     """.format(table_name=table_name)
     result = conn.execute(query).fetchall()
     constraint_dict = defaultdict(list)


[airflow] 04/04: Fix regression in SQLThresholdCheckOperator (#9312)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f41f9ec259891c018338d0e33fff47dd2f77f9b2
Author: Danylo Baibak <da...@gmail.com>
AuthorDate: Wed Jul 1 00:22:37 2020 +0200

    Fix regression in SQLThresholdCheckOperator (#9312)
    
    (cherry picked from commit 87d83a1ae334951c6958689f3bb0cc09f6fc647b)
---
 airflow/operators/sql.py    |  6 +++---
 tests/operators/test_sql.py | 12 ++++++------
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index 57083d2..dbc28dc 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -439,17 +439,17 @@ class SQLThresholdCheckOperator(BaseOperator):
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-        result = hook.get_first(self.sql)[0][0]
+        result = hook.get_first(self.sql)[0]
 
         if isinstance(self.min_threshold, float):
             lower_bound = self.min_threshold
         else:
-            lower_bound = hook.get_first(self.min_threshold)[0][0]
+            lower_bound = hook.get_first(self.min_threshold)[0]
 
         if isinstance(self.max_threshold, float):
             upper_bound = self.max_threshold
         else:
-            upper_bound = hook.get_first(self.max_threshold)[0][0]
+            upper_bound = hook.get_first(self.max_threshold)[0]
 
         meta_data = {
             "result": result,
diff --git a/tests/operators/test_sql.py b/tests/operators/test_sql.py
index 6ccc5fa..0c7d792 100644
--- a/tests/operators/test_sql.py
+++ b/tests/operators/test_sql.py
@@ -259,7 +259,7 @@ class TestThresholdCheckOperator(unittest.TestCase):
     @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
     def test_pass_min_value_max_value(self, mock_get_db_hook):
         mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [(10,)]
+        mock_hook.get_first.return_value = (10,)
         mock_get_db_hook.return_value = mock_hook
 
         operator = self._construct_operator(
@@ -271,7 +271,7 @@ class TestThresholdCheckOperator(unittest.TestCase):
     @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
     def test_fail_min_value_max_value(self, mock_get_db_hook):
         mock_hook = mock.Mock()
-        mock_hook.get_first.return_value = [(10,)]
+        mock_hook.get_first.return_value = (10,)
         mock_get_db_hook.return_value = mock_hook
 
         operator = self._construct_operator(
@@ -284,7 +284,7 @@ class TestThresholdCheckOperator(unittest.TestCase):
     @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
     def test_pass_min_sql_max_sql(self, mock_get_db_hook):
         mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_hook.get_first.side_effect = lambda x: (int(x.split()[1]),)
         mock_get_db_hook.return_value = mock_hook
 
         operator = self._construct_operator(
@@ -295,7 +295,7 @@ class TestThresholdCheckOperator(unittest.TestCase):
     @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
     def test_fail_min_sql_max_sql(self, mock_get_db_hook):
         mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_hook.get_first.side_effect = lambda x: (int(x.split()[1]),)
         mock_get_db_hook.return_value = mock_hook
 
         operator = self._construct_operator(
@@ -307,7 +307,7 @@ class TestThresholdCheckOperator(unittest.TestCase):
     @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
     def test_pass_min_value_max_sql(self, mock_get_db_hook):
         mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_hook.get_first.side_effect = lambda x: (int(x.split()[1]),)
         mock_get_db_hook.return_value = mock_hook
 
         operator = self._construct_operator("Select 75", 45, "Select 100")
@@ -317,7 +317,7 @@ class TestThresholdCheckOperator(unittest.TestCase):
     @mock.patch.object(ThresholdCheckOperator, "get_db_hook")
     def test_fail_min_sql_max_value(self, mock_get_db_hook):
         mock_hook = mock.Mock()
-        mock_hook.get_first.side_effect = lambda x: [(int(x.split()[1]),)]
+        mock_hook.get_first.side_effect = lambda x: (int(x.split()[1]),)
         mock_get_db_hook.return_value = mock_hook
 
         operator = self._construct_operator("Select 155", "Select 45", 100)