You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Anton Cherkasov (JIRA)" <ji...@apache.org> on 2019/04/12 16:29:00 UTC

[jira] [Created] (AIRFLOW-4298) Scheduler can't communicate with Postgres as Metadata DB after upgrade from 1.10.2 to 1.10.3

Anton Cherkasov created AIRFLOW-4298:
----------------------------------------

             Summary: Scheduler can't communicate with Postgres as Metadata DB after upgrade from 1.10.2 to 1.10.3
                 Key: AIRFLOW-4298
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4298
             Project: Apache Airflow
          Issue Type: Bug
          Components: scheduler
    Affects Versions: 1.10.3
         Environment: Main host with airflow services: Cent-OS 7, Python 3.6.1.
DB - official Docker image with Postgresql 11.2-alpine
            Reporter: Anton Cherkasov


I have some strange issue with scheduler after upgrade to 1.10.3 from 1.10.2. DAG tasks runs only once. After that scheduler logs looks like this:
{noformat}
Apr 11 09:21:33 airflow.infra airflow[32739]: [2019-04-11 09:21:33,094] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
Apr 11 09:21:44 airflow.infra airflow[32739]: [2019-04-11 09:21:44,105] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
Apr 11 09:21:55 airflow.infra airflow[32739]: [2019-04-11 09:21:55,114] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
Apr 11 09:22:06 airflow.infra airflow[32739]: [2019-04-11 09:22:06,123] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
Apr 11 09:22:17 airflow.infra airflow[32739]: [2019-04-11 09:22:17,131] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
Apr 11 09:22:28 airflow.infra airflow[32739]: [2019-04-11 09:22:28,143] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
{noformat}
Logs from Scheduler with *DEBUG* level:
{noformat}
Apr 11 09:00:47 airflow.infra airflow[17403]: [2019-04-11 09:00:47,720] {{settings.py:154}} DEBUG - Setting up DB connection pool (PID 17447)
Apr 11 09:00:47 airflow.infra airflow[17403]: [2019-04-11 09:00:47,720] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=100, pool_recycle=3600, pid=17447
Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,450] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 17449)
Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,535] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 17448)
Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,706] {{jobs.py:1663}} DEBUG - Sleeping for 1.00 seconds to prevent excessive logging
. . .
Apr 11 09:01:29 airflow.infra airflow[17403]: [2019-04-11 09:01:29,884] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21866)
Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,492] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21865)
Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,785] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor73-Process, stopped)>
Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,786] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor74-Process, stopped)>
Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,790] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,765] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21910)
Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,786] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor76-Process, stopped)>
Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,866] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21909)
Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,468] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21921)
Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,787] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor75-Process, stopped)>
Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,787] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor77-Process, stopped)>
Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,358] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21933)
Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,521] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21934)
Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,788] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor78-Process, stopped)>
Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,789] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor79-Process, stopped)>
Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,456] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21949)
Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,679] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21950)
Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,789] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor80-Process, stopped)>
Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,791] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor81-Process, stopped)>
Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,892] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21964)
Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,405] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21963)
Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,790] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor82-Process, stopped)>
Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,791] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor83-Process, stopped)>
Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,834] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21994)
Apr 11 09:01:36 airflow.infra airflow[17403]: [2019-04-11 09:01:36,462] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21993)
Apr 11 09:01:36 airflow.infra airflow[17403]: [2019-04-11 09:01:36,792] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor84-Process, stopped)>
Apr 11 09:01:36 airflow.infra airflow[17403]: [2019-04-11 09:01:36,793] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor85-Process, stopped)>
Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,509] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22003)
Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,672] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22004)
Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,793] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor86-Process, stopped)>
Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,794] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor87-Process, stopped)>
Apr 11 09:01:38 airflow.infra airflow[17403]: [2019-04-11 09:01:38,472] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22010)
Apr 11 09:01:38 airflow.infra airflow[17403]: [2019-04-11 09:01:38,795] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor88-Process, stopped)>
Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,522] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22054)
Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,588] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22053)
Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,796] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor89-Process, stopped)>
Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,796] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor90-Process, stopped)>
Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,888] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22060)
Apr 11 09:01:40 airflow.infra airflow[17403]: [2019-04-11 09:01:40,460] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22061)
Apr 11 09:01:40 airflow.infra airflow[17403]: [2019-04-11 09:01:40,798] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor91-Process, stopped)>
Apr 11 09:01:40 airflow.infra airflow[17403]: [2019-04-11 09:01:40,799] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor92-Process, stopped)>
Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,459] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22088)
Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,478] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22089)
Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,799] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor93-Process, stopped)>
Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,799] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor94-Process, stopped)>
Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,804] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting...
{noformat}
Webserver and Worker works fine. If I ran task by hands - it works.
 I'm using postgresql in docker as Metadata DB.

I'm using CeleryExecutor and RabbitMQ as broker.

Related parts of airflow.cfg:
{noformat}
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgresql+psycopg2://{{airflow_db_user}}:{{airflow_db_pass}}@{{airflow_db_host}}:12411/airflow

# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 100

# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite. If the number of DB connections is ever exceeded,
# a lower config value will allow the system to recover faster.
sql_alchemy_pool_recycle = 3600

# How many seconds to retry re-establishing a DB connection after
# disconnects. Setting this to 0 disables retries.
sql_alchemy_reconnect_timeout = 300

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 116

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 4
{noformat}

{noformat}
[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5

# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5

# after how much time should the scheduler terminate in seconds
# -1 indicates to run continuously (see also num_runs)
run_duration = -1

# after how much time (seconds) a new DAGs should be picked up from the filesystem
min_file_process_interval = 0

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300

# How often should stats be printed to the logs
print_stats_interval = 30

child_process_log_directory = {{ airflow_child_scheduler_logs_folder }}

# Local task jobs periodically heartbeat to the DB. If the job has
# not heartbeat in this many seconds, the scheduler will mark the
# associated task instance as failed and will re-schedule the task.
scheduler_zombie_task_threshold = 300

# Turn off scheduler catchup by setting this to False.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
# will not do scheduler catchup if this is False,
# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True

# This changes the batch size of queries in the scheduling main loop.
# If this is too high, SQL query performance may be impacted by one
# or more of the following:
#  - reversion to full table scan
#  - complexity of query predicate
#  - excessive locking
#
# Additionally, you may hit the maximum allowable query length for your db.
#
# Set this to 0 for no limit (not advised)
max_tis_per_query = 0

# Statsd (https://github.com/etsy/statsd) integration settings
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
max_threads = 2

authenticate = False
{noformat}


DB configuration:
{noformat}
-c
max_connections=1000
-c
shared_buffers=1GB
-c
effective_cache_size=3GB
-c
maintenance_work_mem=256MB
-c
checkpoint_completion_target=0.7
-c
wal_buffers=16MB
-c
default_statistics_target=100
-c
random_page_cost=1.1
-c
max_wal_size=2GB
-c
work_mem=104857kB
-c
min_wal_size=1GB
-c
max_wal_senders=5
-c
max_worker_processes=2
-c
max_parallel_workers_per_gather=1
-c
max_parallel_workers=2
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)