You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Gerardo Curiel (JIRA)" <ji...@apache.org> on 2019/05/13 05:14:00 UTC

[jira] [Commented] (AIRFLOW-4298) Stop Scheduler warning repeatedly about "connection invalidated"

    [ https://issues.apache.org/jira/browse/AIRFLOW-4298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16838267#comment-16838267 ] 

Gerardo Curiel commented on AIRFLOW-4298:
-----------------------------------------

[~ash]
According to some reports in the Slack group (and my own experience), the message is showing up every 11 seconds. Here's the source: https://github.com/apache/airflow/blob/master/airflow/utils/sqlalchemy.py#L76.

It's coming from Airflow itself, not sqlalchemy.

> Stop Scheduler warning repeatedly about "connection invalidated"
> ----------------------------------------------------------------
>
>                 Key: AIRFLOW-4298
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4298
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: scheduler
>    Affects Versions: 1.10.3
>         Environment: Main host with airflow services: Cent-OS 7, Python 3.6.1.
> DB - official Docker image with Postgresql 11.2-alpine
>            Reporter: Anton Cherkasov
>            Priority: Minor
>
> I have some strange issue with scheduler after upgrade to 1.10.3 from 1.10.2. DAG tasks runs only once. After that scheduler logs looks like this:
> {noformat}
> Apr 11 09:21:33 airflow.infra airflow[32739]: [2019-04-11 09:21:33,094] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:21:44 airflow.infra airflow[32739]: [2019-04-11 09:21:44,105] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:21:55 airflow.infra airflow[32739]: [2019-04-11 09:21:55,114] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:22:06 airflow.infra airflow[32739]: [2019-04-11 09:22:06,123] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:22:17 airflow.infra airflow[32739]: [2019-04-11 09:22:17,131] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:22:28 airflow.infra airflow[32739]: [2019-04-11 09:22:28,143] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> {noformat}
> Logs from Scheduler with *DEBUG* level:
> {noformat}
> Apr 11 09:00:47 airflow.infra airflow[17403]: [2019-04-11 09:00:47,720] {{settings.py:154}} DEBUG - Setting up DB connection pool (PID 17447)
> Apr 11 09:00:47 airflow.infra airflow[17403]: [2019-04-11 09:00:47,720] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=100, pool_recycle=3600, pid=17447
> Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,450] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 17449)
> Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,535] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 17448)
> Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,706] {{jobs.py:1663}} DEBUG - Sleeping for 1.00 seconds to prevent excessive logging
> . . .
> Apr 11 09:01:29 airflow.infra airflow[17403]: [2019-04-11 09:01:29,884] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21866)
> Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,492] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21865)
> Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,785] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor73-Process, stopped)>
> Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,786] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor74-Process, stopped)>
> Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,790] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,765] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21910)
> Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,786] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor76-Process, stopped)>
> Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,866] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21909)
> Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,468] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21921)
> Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,787] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor75-Process, stopped)>
> Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,787] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor77-Process, stopped)>
> Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,358] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21933)
> Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,521] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21934)
> Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,788] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor78-Process, stopped)>
> Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,789] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor79-Process, stopped)>
> Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,456] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21949)
> Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,679] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21950)
> Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,789] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor80-Process, stopped)>
> Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,791] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor81-Process, stopped)>
> Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,892] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21964)
> Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,405] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21963)
> Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,790] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor82-Process, stopped)>
> Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,791] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor83-Process, stopped)>
> Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,834] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21994)
> Apr 11 09:01:36 airflow.infra airflow[17403]: [2019-04-11 09:01:36,462] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21993)
> Apr 11 09:01:36 airflow.infra airflow[17403]: [2019-04-11 09:01:36,792] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor84-Process, stopped)>
> Apr 11 09:01:36 airflow.infra airflow[17403]: [2019-04-11 09:01:36,793] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor85-Process, stopped)>
> Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,509] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22003)
> Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,672] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22004)
> Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,793] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor86-Process, stopped)>
> Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,794] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor87-Process, stopped)>
> Apr 11 09:01:38 airflow.infra airflow[17403]: [2019-04-11 09:01:38,472] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22010)
> Apr 11 09:01:38 airflow.infra airflow[17403]: [2019-04-11 09:01:38,795] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor88-Process, stopped)>
> Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,522] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22054)
> Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,588] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22053)
> Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,796] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor89-Process, stopped)>
> Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,796] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor90-Process, stopped)>
> Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,888] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22060)
> Apr 11 09:01:40 airflow.infra airflow[17403]: [2019-04-11 09:01:40,460] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22061)
> Apr 11 09:01:40 airflow.infra airflow[17403]: [2019-04-11 09:01:40,798] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor91-Process, stopped)>
> Apr 11 09:01:40 airflow.infra airflow[17403]: [2019-04-11 09:01:40,799] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor92-Process, stopped)>
> Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,459] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22088)
> Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,478] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22089)
> Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,799] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor93-Process, stopped)>
> Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,799] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor94-Process, stopped)>
> Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,804] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
> {noformat}
> Webserver and Worker works fine. If I ran task by hands - it works.
>  I'm using postgresql in docker as Metadata DB.
> I'm using CeleryExecutor and RabbitMQ as broker.
> Related parts of airflow.cfg:
> {noformat}
> # The executor class that airflow should use. Choices include
> # SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
> executor = CeleryExecutor
> # The SqlAlchemy connection string to the metadata database.
> # SqlAlchemy supports many different database engine, more information
> # their website
> sql_alchemy_conn = postgresql+psycopg2://{{airflow_db_user}}:{{airflow_db_pass}}@{{airflow_db_host}}:12411/airflow
> # If SqlAlchemy should pool database connections.
> sql_alchemy_pool_enabled = True
> # The SqlAlchemy pool size is the maximum number of database connections
> # in the pool. 0 indicates no limit.
> sql_alchemy_pool_size = 100
> # The SqlAlchemy pool recycle is the number of seconds a connection
> # can be idle in the pool before it is invalidated. This config does
> # not apply to sqlite. If the number of DB connections is ever exceeded,
> # a lower config value will allow the system to recover faster.
> sql_alchemy_pool_recycle = 3600
> # How many seconds to retry re-establishing a DB connection after
> # disconnects. Setting this to 0 disables retries.
> sql_alchemy_reconnect_timeout = 300
> # The amount of parallelism as a setting to the executor. This defines
> # the max number of task instances that should run simultaneously
> # on this airflow installation
> parallelism = 116
> # The number of task instances allowed to run concurrently by the scheduler
> dag_concurrency = 16
> # Are DAGs paused by default at creation
> dags_are_paused_at_creation = True
> # When not using pools, tasks are run in the "default pool",
> # whose size is guided by this config element
> non_pooled_task_slot_count = 128
> # The maximum number of active DAG runs per DAG
> max_active_runs_per_dag = 4
> {noformat}
> {noformat}
> [scheduler]
> # Task instances listen for external kill signal (when you clear tasks
> # from the CLI or the UI), this defines the frequency at which they should
> # listen (in seconds).
> job_heartbeat_sec = 5
> # The scheduler constantly tries to trigger new tasks (look at the
> # scheduler section in the docs for more information). This defines
> # how often the scheduler should run (in seconds).
> scheduler_heartbeat_sec = 5
> # after how much time should the scheduler terminate in seconds
> # -1 indicates to run continuously (see also num_runs)
> run_duration = -1
> # after how much time (seconds) a new DAGs should be picked up from the filesystem
> min_file_process_interval = 0
> # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
> dag_dir_list_interval = 300
> # How often should stats be printed to the logs
> print_stats_interval = 30
> child_process_log_directory = {{ airflow_child_scheduler_logs_folder }}
> # Local task jobs periodically heartbeat to the DB. If the job has
> # not heartbeat in this many seconds, the scheduler will mark the
> # associated task instance as failed and will re-schedule the task.
> scheduler_zombie_task_threshold = 300
> # Turn off scheduler catchup by setting this to False.
> # Default behavior is unchanged and
> # Command Line Backfills still work, but the scheduler
> # will not do scheduler catchup if this is False,
> # however it can be set on a per DAG basis in the
> # DAG definition (catchup)
> catchup_by_default = True
> # This changes the batch size of queries in the scheduling main loop.
> # If this is too high, SQL query performance may be impacted by one
> # or more of the following:
> #  - reversion to full table scan
> #  - complexity of query predicate
> #  - excessive locking
> #
> # Additionally, you may hit the maximum allowable query length for your db.
> #
> # Set this to 0 for no limit (not advised)
> max_tis_per_query = 0
> # Statsd (https://github.com/etsy/statsd) integration settings
> statsd_on = True
> statsd_host = localhost
> statsd_port = 8125
> statsd_prefix = airflow
> # The scheduler can run multiple threads in parallel to schedule dags.
> # This defines how many threads will run.
> max_threads = 2
> authenticate = False
> {noformat}
> DB configuration:
> {noformat}
> -c
> max_connections=1000
> -c
> shared_buffers=1GB
> -c
> effective_cache_size=3GB
> -c
> maintenance_work_mem=256MB
> -c
> checkpoint_completion_target=0.7
> -c
> wal_buffers=16MB
> -c
> default_statistics_target=100
> -c
> random_page_cost=1.1
> -c
> max_wal_size=2GB
> -c
> work_mem=104857kB
> -c
> min_wal_size=1GB
> -c
> max_wal_senders=5
> -c
> max_worker_processes=2
> -c
> max_parallel_workers_per_gather=1
> -c
> max_parallel_workers=2
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)