You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/19 19:20:43 UTC

[airflow] branch v1-10-test updated (9f1b66f -> 24aae74)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 9f1b66f  Fixes issue with affinity backcompat in Airflow 1.10
 discard 8cce998  Pass SQLAlchemy engine options to FAB based UI (#11395)
     new 00bf5e5  Pass SQLAlchemy engine options to FAB based UI (#11395)
     new fcb784f  When sending tasks to celery from a sub-process, reset signal handlers (#11278)
     new 24aae74  Fixes issue with affinity backcompat in Airflow 1.10

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (9f1b66f)
            \
             N -- N -- N   refs/heads/v1-10-test (24aae74)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/executors/celery_executor.py |  9 ++++++++-
 airflow/www/app.py                   |  6 +++---
 tests/core/test_sqlalchemy_config.py |  3 ++-
 tests/www/test_app.py                | 25 -------------------------
 tests/www_rbac/test_app.py           |  4 ++--
 5 files changed, 15 insertions(+), 32 deletions(-)


[airflow] 03/03: Fixes issue with affinity backcompat in Airflow 1.10

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 24aae747117ff329e5448782ec923786c9206582
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Thu Nov 19 10:29:59 2020 -0800

    Fixes issue with affinity backcompat in Airflow 1.10
    
    There was a breaking change in 1.10.12 where the affinity argument
    was being turned into a k8s.V1Affinity object instead of a python dict.
    
    This commit solves https://github.com/apache/airflow/issues/11731
---
 airflow/kubernetes/pod_launcher.py    |  2 +-
 tests/kubernetes/test_pod_launcher.py | 34 +++++++++++++++++++++++++++++++---
 2 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 704a77e..468e077 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -326,7 +326,7 @@ def _convert_to_airflow_pod(pod):
         resources=base_container.resources,
         service_account_name=pod.spec.service_account_name,
         secrets=secrets,
-        affinity=pod.spec.affinity,
+        affinity=api_client.sanitize_for_serialization(pod.spec.affinity),
         hostnetwork=pod.spec.host_network,
         security_context=_extract_security_context(pod.spec.security_context)
     )
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 63169ae..00198fe 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -175,9 +175,26 @@ class TestPodLauncherHelper(unittest.TestCase):
         input_pod = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
                 name="foo",
-                namespace="bar"
+                namespace="bar",
+                annotations={"foo": "bar"}
             ),
             spec=k8s.V1PodSpec(
+                affinity=k8s.V1Affinity(
+                    pod_anti_affinity=k8s.V1PodAntiAffinity(
+                        required_during_scheduling_ignored_during_execution=[
+                            k8s.V1WeightedPodAffinityTerm(
+                                weight=1,
+                                pod_affinity_term=k8s.V1PodAffinityTerm(
+                                    label_selector=k8s.V1LabelSelector(
+                                        match_expressions=[
+                                            k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
+                                        ]
+                                    ),
+                                    topology_key="failure-domain.beta.kubernetes.io/zone",
+                                ),
+                            )
+                        ]
+                    )),
                 init_containers=[
                     k8s.V1Container(
                         name="init-container",
@@ -256,9 +273,12 @@ class TestPodLauncherHelper(unittest.TestCase):
         )
         result_pod = _convert_to_airflow_pod(input_pod)
 
+        self.assertEqual(type(result_pod.affinity), dict)
+
         expected = Pod(
             name="foo",
             namespace="bar",
+            annotations={"foo": "bar"},
             envs={},
             init_containers=[
                 {'name': 'init-container', 'volumeMounts': [{'mountPath': '/tmp', 'name': 'init-secret'}]}
@@ -288,6 +308,14 @@ class TestPodLauncherHelper(unittest.TestCase):
                     read_only=True
                 )],
             image_pull_secrets="my-secret",
+            affinity={'podAntiAffinity':
+                        {'requiredDuringSchedulingIgnoredDuringExecution':
+                            [{'podAffinityTerm':
+                                {'labelSelector':
+                                    {'matchExpressions':
+                                        [{'key': 'security', 'operator': 'In', 'values': 'S1'}]},
+                                    'topologyKey': 'failure-domain.beta.kubernetes.io/zone'},
+                                'weight': 1}]}},
             secrets=[Secret("env", "AIRFLOW_SECRET", "ai", "secret_key")],
             security_context={'fsGroup': 0, 'runAsUser': 0},
             volumes=[Volume(name="myvolume", configs={'name': 'myvolume'}),
@@ -295,8 +323,8 @@ class TestPodLauncherHelper(unittest.TestCase):
                                                             'name': 'airflow-config'}),
                      Volume(name='airflow-secret', configs={'name': 'airflow-secret',
                                                             'secret': {'secretName': 'secret-name'}}),
-                     Volume(name='init-secret', configs={'name': 'init-secret', 'secret':
-                            {'secretName': 'init-secret'}})],
+                     Volume(name='init-secret', configs={'name': 'init-secret',
+                                                         'secret': {'secretName': 'init-secret'}})],
         )
         expected_dict = expected.as_dict()
         result_dict = result_pod.as_dict()


[airflow] 01/03: Pass SQLAlchemy engine options to FAB based UI (#11395)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 00bf5e5a0ca55a8750a612457fd0e450e0b06e02
Author: MichaƂ Misiewicz <mi...@gmail.com>
AuthorDate: Fri Oct 16 19:55:41 2020 +0200

    Pass SQLAlchemy engine options to FAB based UI (#11395)
    
    Co-authored-by: Tomek Urbaszek <tu...@gmail.com>
    (cherry picked from commit 91484b938f0b6f943404f1aeb3e63b61b808cfe9)
---
 airflow/settings.py                  | 57 +++++++++++++++++++-----------------
 airflow/www/app.py                   |  3 ++
 airflow/www_rbac/app.py              |  4 +++
 tests/core/test_sqlalchemy_config.py |  3 +-
 tests/www/test_app.py                |  1 -
 tests/www_rbac/test_app.py           | 26 +++++++++++++++-
 6 files changed, 64 insertions(+), 30 deletions(-)

diff --git a/airflow/settings.py b/airflow/settings.py
index e39c960..0f35dc5 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -36,7 +36,6 @@ from sqlalchemy.pool import NullPool
 
 from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG  # NOQA F401
 from airflow.logging_config import configure_logging
-from airflow.utils.module_loading import import_string
 from airflow.utils.sqlalchemy import setup_event_handlers
 
 log = logging.getLogger(__name__)
@@ -233,12 +232,38 @@ def configure_orm(disable_connection_pool=False):
     log.debug("Setting up DB connection pool (PID %s)" % os.getpid())
     global engine
     global Session
-    engine_args = {}
+    engine_args = prepare_engine_args(disable_connection_pool)
+
+    # Allow the user to specify an encoding for their DB otherwise default
+    # to utf-8 so jobs & users with non-latin1 characters can still use us.
+    engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8')
+
+    # For Python2 we get back a newstr and need a str
+    engine_args['encoding'] = engine_args['encoding'].__str__()
+
+    if conf.has_option('core', 'sql_alchemy_connect_args'):
+        connect_args = conf.getimport('core', 'sql_alchemy_connect_args')
+    else:
+        connect_args = {}
+
+    engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
+    setup_event_handlers(engine)
+
+    Session = scoped_session(sessionmaker(
+        autocommit=False,
+        autoflush=False,
+        bind=engine,
+        expire_on_commit=False,
+    ))
 
+
+def prepare_engine_args(disable_connection_pool=False):
+    """Prepare SQLAlchemy engine args"""
+    engine_args = {}
     pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED')
     if disable_connection_pool or not pool_connections:
         engine_args['poolclass'] = NullPool
-        log.debug("settings.configure_orm(): Using NullPool")
+        log.debug("settings.prepare_engine_args(): Using NullPool")
     elif 'sqlite' not in SQL_ALCHEMY_CONN:
         # Pool size engine args not supported by sqlite.
         # If no config value is defined for the pool size, select a reasonable value.
@@ -270,35 +295,13 @@ def configure_orm(disable_connection_pool=False):
         # https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
         pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=True)
 
-        log.debug("settings.configure_orm(): Using pool settings. pool_size=%d, max_overflow=%d, "
+        log.debug("settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
                   "pool_recycle=%d, pid=%d", pool_size, max_overflow, pool_recycle, os.getpid())
         engine_args['pool_size'] = pool_size
         engine_args['pool_recycle'] = pool_recycle
         engine_args['pool_pre_ping'] = pool_pre_ping
         engine_args['max_overflow'] = max_overflow
-
-    # Allow the user to specify an encoding for their DB otherwise default
-    # to utf-8 so jobs & users with non-latin1 characters can still use
-    # us.
-    engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8')
-    # For Python2 we get back a newstr and need a str
-    engine_args['encoding'] = engine_args['encoding'].__str__()
-
-    if conf.has_option('core', 'sql_alchemy_connect_args'):
-        connect_args = import_string(
-            conf.get('core', 'sql_alchemy_connect_args')
-        )
-    else:
-        connect_args = {}
-
-    engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
-    setup_event_handlers(engine)
-
-    Session = scoped_session(
-        sessionmaker(autocommit=False,
-                     autoflush=False,
-                     bind=engine,
-                     expire_on_commit=False))
+    return engine_args
 
 
 def dispose_orm():
diff --git a/airflow/www/app.py b/airflow/www/app.py
index b101f45..7d0dae7 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -17,9 +17,12 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+import datetime
 import logging
 from typing import Any
 
+import flask
+import flask_login
 import six
 from flask import Flask
 from flask_admin import Admin, base
diff --git a/airflow/www_rbac/app.py b/airflow/www_rbac/app.py
index 46ad120..29a364b 100644
--- a/airflow/www_rbac/app.py
+++ b/airflow/www_rbac/app.py
@@ -46,6 +46,7 @@ csrf = CSRFProtect()
 
 log = logging.getLogger(__name__)
 
+
 def create_app(config=None, session=None, testing=False, app_name="Airflow"):
     global app, appbuilder
     app = Flask(__name__)
@@ -76,6 +77,9 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"):
     if config:
         app.config.from_mapping(config)
 
+    if 'SQLALCHEMY_ENGINE_OPTIONS' not in app.config:
+        app.config['SQLALCHEMY_ENGINE_OPTIONS'] = settings.prepare_engine_args()
+
     csrf.init_app(app)
 
     db = SQLA(app)
diff --git a/tests/core/test_sqlalchemy_config.py b/tests/core/test_sqlalchemy_config.py
index 6fa7ac9..99d5de8 100644
--- a/tests/core/test_sqlalchemy_config.py
+++ b/tests/core/test_sqlalchemy_config.py
@@ -19,6 +19,7 @@
 
 import unittest
 
+from airflow.exceptions import AirflowConfigException
 from sqlalchemy.pool import NullPool
 
 from airflow import settings
@@ -102,6 +103,6 @@ class TestSqlAlchemySettings(unittest.TestCase):
             ('core', 'sql_alchemy_connect_args'): 'does.not.exist',
             ('core', 'sql_alchemy_pool_enabled'): 'False'
         }
-        with self.assertRaises(ImportError):
+        with self.assertRaises(AirflowConfigException):
             with conf_vars(config):
                 settings.configure_orm()
diff --git a/tests/www/test_app.py b/tests/www/test_app.py
index 64255aa..56ec213 100644
--- a/tests/www/test_app.py
+++ b/tests/www/test_app.py
@@ -16,7 +16,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import unittest
 
 from werkzeug.middleware.proxy_fix import ProxyFix
diff --git a/tests/www_rbac/test_app.py b/tests/www_rbac/test_app.py
index 71d6255..633176e 100644
--- a/tests/www_rbac/test_app.py
+++ b/tests/www_rbac/test_app.py
@@ -16,13 +16,16 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import json
 import unittest
 
+import pytest
+import six
 from werkzeug.middleware.proxy_fix import ProxyFix
 
 from airflow.settings import Session
 from airflow.www_rbac import app as application
+from tests.compat import mock
 from tests.test_utils.config import conf_vars
 
 
@@ -56,3 +59,24 @@ class TestApp(unittest.TestCase):
         self.assertEqual(app.wsgi_app.x_host, 5)
         self.assertEqual(app.wsgi_app.x_port, 6)
         self.assertEqual(app.wsgi_app.x_prefix, 7)
+
+    @conf_vars({
+        ('core', 'sql_alchemy_pool_enabled'): 'True',
+        ('core', 'sql_alchemy_pool_size'): '3',
+        ('core', 'sql_alchemy_max_overflow'): '5',
+        ('core', 'sql_alchemy_pool_recycle'): '120',
+        ('core', 'sql_alchemy_pool_pre_ping'): 'True',
+    })
+    @mock.patch("airflow.www_rbac.app.app", None)
+    @pytest.mark.backend("mysql", "postgres")
+    def test_should_set_sqlalchemy_engine_options(self):
+        app = application.cached_appbuilder(testing=True).app
+        engine_params = {
+            'pool_size': 3,
+            'pool_recycle': 120,
+            'pool_pre_ping': True,
+            'max_overflow': 5
+        }
+        if six.PY2:
+            engine_params = json.dumps(engine_params)
+        self.assertEqual(app.config['SQLALCHEMY_ENGINE_OPTIONS'], engine_params)


[airflow] 02/03: When sending tasks to celery from a sub-process, reset signal handlers (#11278)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fcb784f798d76f4a1500377a6ea0f30187705131
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Mon Oct 5 14:16:08 2020 +0100

    When sending tasks to celery from a sub-process, reset signal handlers (#11278)
    
    Since these processes are spawned from SchedulerJob after it has
    registered it's signals, if any of them got signaled they would have the
    behaviour of killing the ProcessorAgent process group! (MP has a default
    spawn of fork on Linux, so they inherit all previous state -- signals,
    and access to the `_process.pid` inside the ProcessorAgent instance)
    
    This behaviour is not what we want for these multiprocess.Pool processes.
    
    This _may_ be a source of the long-standing "scheduler is alive but not
    scheduling any jobs. Maybe.
    
    (cherry picked from commit baa980fbc83b25b5cf0700e567e69c2eb156412f)
---
 airflow/executors/celery_executor.py | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 128b25b..35b4e84 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -219,7 +219,14 @@ class CeleryExecutor(BaseExecutor):
             chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))
             num_processes = min(len(task_tuples_to_send), self._sync_parallelism)
 
-            send_pool = Pool(processes=num_processes)
+            def reset_signals():
+                # Since we are run from inside the SchedulerJob, we don't to
+                # inherit the signal handlers that we registered there.
+                import signal
+                signal.signal(signal.SIGINT, signal.SIG_DFL)
+                signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+            send_pool = Pool(processes=num_processes, initializer=reset_signals)
             key_and_async_results = send_pool.map(
                 send_task_to_executor,
                 task_tuples_to_send,