You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Christian Pfaff (Jira)" <ji...@apache.org> on 2020/11/23 13:12:00 UTC

[jira] [Commented] (AIRFLOW-2946) Connection times out on airflow worker

    [ https://issues.apache.org/jira/browse/AIRFLOW-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237358#comment-17237358 ] 

Christian Pfaff commented on AIRFLOW-2946:
------------------------------------------

I'm facing the same issue and have tried a couple of things:
 * increased the sql_alchemy_pool_size and -max_overflow in airflow.cfg
 * increased no of max connections on Postgres

nothing worked.

Background: The DAG is calling several 10-thousand objects behind a REST API endpoint via a HttpHook in parallel. Therefore, the requests are running in Threads with batches of 100.

 

What works as a quick-fix for me is a modified version of the HttpHook, which "caches" the conn in a static variable:
{code:java}
class HttpHook2(BaseHook):    
...
    conn = None 
...

    def get_conn(self, headers=None):        
    ...        
    session = requests.Session()        
    if self.http_conn_id:            
        if HttpHook2.conn == None:                 
            HttpHook2.conn = self.get_connection(self.http_conn_id)                
            conn = HttpHook2.conn            
        else:                 
            conn = HttpHook2.conn
...{code}
 
 

Not an elegant solution, I know, but maybe it helps to debug and fix the problem with a more professional solution :)

 

 

> Connection times out on airflow worker
> --------------------------------------
>
>                 Key: AIRFLOW-2946
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2946
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery, executors, worker
>    Affects Versions: 1.10.0
>         Environment: ubuntu 16.04, AWS EC2 
>            Reporter: Avik Aggarwal
>            Priority: Critical
>
> Hi 
> I have Airflow cluster setup running Celery executors with Postgresql installed on same machine as webserver and scheduler.
> After sometime, remote worker shows error 'Connection timed out' and Airflow queues number of configured tasks in pool in queue and flow hungs up there until queue tasks are deleted manually after stopping the scheduler service.
>  
> Logs:
> [2018-08-23 13:44:03,954: ERROR/MainProcess] Pool callback raised exception: OperationalError('(psycopg2.OperationalError) could not connect to server: Connection timed out\n\tIs the server running on host "<host>" and accepting\n\tTCP/IP connections on port 5432?\n',)
>  Traceback (most recent call last):
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/billiard/pool.py", line 1747, in safe_apply_callback
>  fun(*args, **kwargs)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/celery/worker/request.py", line 367, in on_failure
>  self.id, exc, request=self, store_result=self.store_errors,
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/base.py", line 157, in mark_as_failure
>  traceback=traceback, request=request)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/base.py", line 322, in store_result
>  request=request, **kwargs)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/__init__.py", line 53, in _inner
>  return fun(*args, **kwargs)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/__init__.py", line 105, in _store_result
>  session = self.ResultSession()
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/__init__.py", line 99, in ResultSession
>  **self.engine_options)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/session.py", line 60, in session_factory
>  self.prepare_models(engine)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/session.py", line 55, in prepare_models
>  ResultModelBase.metadata.create_all(engine)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/sql/schema.py", line 4005, in create_all
>  tables=tables)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1939, in _run_visitor
>  with self._optional_conn_ctx_manager(connection) as conn:
>  File "/usr/lib/python2.7/contextlib.py", line 17, in __enter__
>  return self.gen.next()
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1932, in _optional_conn_ctx_manager
>  with self.contextual_connect() as conn:
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 2123, in contextual_connect
>  self._wrap_pool_connect(self.pool.connect, None),
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 2162, in _wrap_pool_connect
>  e, dialect, self)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception_noconnection
>  exc_info
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 265, in raise_from_cause
>  reraise(type(exception), exception, tb=exc_tb, cause=cause)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 2158, in _wrap_pool_connect
>  return fn()
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", line 403, in connect
>  return _ConnectionFairy._checkout(self)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", line 791, in _checkout
>  fairy = _ConnectionRecord.checkout(pool)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", line 532, in checkout
>  rec = pool._do_get()
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", line 1287, in _do_get
>  return self._create_connection()
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", line 350, in _create_connection
>  return _ConnectionRecord(self)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", line 477, in __init__
>  self.__connect(first_connect_check=True)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", line 674, in __connect
>  connection = pool._invoke_creator(self)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/strategies.py", line 106, in connect
>  return dialect.connect(*cargs, **cparams)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 412, in connect
>  return self.dbapi.connect(*cargs, **cparams)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/psycopg2/__init__.py", line 130, in connect
>  conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
>  OperationalError: (psycopg2.OperationalError) could not connect to server: Connection timed out
>  Is the server running on host "<host>" and accepting
>  TCP/IP connections on port 5432?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)