You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (Jira)" <ji...@apache.org> on 2019/11/19 19:33:00 UTC

[jira] [Closed] (AIRFLOW-6015) Celery needed even if LocalExecutor is configured

     [ https://issues.apache.org/jira/browse/AIRFLOW-6015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ash Berlin-Taylor closed AIRFLOW-6015.
--------------------------------------
    Resolution: Not A Problem

{{airflow worker}} is for running Celery Workers -- when you are using the LocalExecutor you do not need to (and shouldn't) run {{airflow worker}} as the task processes are create as sub-processes of the scheduler.

Doc PR welcome to make this clearer :)

> Celery needed even if LocalExecutor is configured
> -------------------------------------------------
>
>                 Key: AIRFLOW-6015
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6015
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: configuration
>    Affects Versions: 1.10.6
>         Environment: ubuntu 16.04
> Python 3.5 with virtualenv
>            Reporter: Mathieu Cinquin
>            Priority: Major
>
> Hello,
>  
> Celery is required to start airflow worker even if the executor configured is LocalExecutor.
> If i install apache-airflow[celery], the worker daemon well starts and uses Celery as executor.
> log
> {code:java}
> Nov 19 19:47:27 airflow-1 systemd[1]: Started Airflow worker daemon.
> Nov 19 19:47:28 airflow-1 bash[10663]: [2019-11-19 19:47:28,226] {settings.py:252} INFO - settings.configure_orm(): Using pool settings. pool_size=128, max_overflow=10, pool_recycle=600, pid=10668
> Nov 19 19:47:29 airflow-1 bash[10663]: Traceback (most recent call last):
> Nov 19 19:47:29 airflow-1 bash[10663]:   File "/home/airflow/airflow/venv/bin/airflow", line 37, in <module>
> Nov 19 19:47:29 airflow-1 bash[10663]:     args.func(args)
> Nov 19 19:47:29 airflow-1 bash[10663]:   File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/utils/cli.py", line 74, in wrapper
> Nov 19 19:47:29 airflow-1 bash[10663]:     return f(*args, **kwargs)
> Nov 19 19:47:29 airflow-1 bash[10663]:   File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/bin/cli.py", line 1075, in worker
> Nov 19 19:47:29 airflow-1 bash[10663]:     from airflow.executors.celery_executor import app as celery_app
> Nov 19 19:47:29 airflow-1 bash[10663]:   File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", line 27, in <module>
> Nov 19 19:47:29 airflow-1 bash[10663]:     from celery import Celery
> Nov 19 19:47:29 airflow-1 bash[10663]: ImportError: No module named 'celery'
> Nov 19 19:47:30 airflow-1 systemd[1]: airflow-worker.service: Main process exited, code=exited, status=1/FAILURE
> Nov 19 19:47:30 airflow-1 systemd[1]: airflow-worker.service: Unit entered failed state.
> Nov 19 19:47:30 airflow-1 systemd[1]: airflow-worker.service: Failed with result 'exit-code'.
> Nov 19 19:47:40 airflow-1 systemd[1]: airflow-worker.service: Service hold-off time over, scheduling restart.
> Nov 19 19:47:40 airflow-1 systemd[1]: Stopped Airflow worker daemon.
> {code}
>  
> airflow.cfg
>  
> {code:java}
> [core]
> # The home folder for airflow, default is ~/airflow
> # airflow_home = /home/airflow/airflow
> # The folder where your airflow pipelines live, most likely a
> # subfolder in a code repository
> # This path must be absolute
> dags_folder = /home/airflow/airflow/dags
> # The folder where airflow should store its log files
> # This path must be absolute
> base_log_folder = /var/log/airflow# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
> # must supply a remote location URL (starting with either 's3://...' or
> # 'gs://...') and an Airflow connection id that provides access to the storage
> # location.
> remote_base_log_folder =
> remote_log_conn_id =
> # Use server-side encryption for logs stored in S3
> encrypt_s3_logs = False
> # DEPRECATED option for remote log storage, use remote_base_log_folder instead!
> s3_log_folder = None
> # The executor class that airflow should use. Choices include
> # SequentialExecutor, LocalExecutor, CeleryExecutor
> executor = LocalExecutor
> # The SqlAlchemy connection string to the metadata database.
> # SqlAlchemy supports many different database engine, more information
> # their website
> sql_alchemy_conn = mysql://airflow:xxx@localhost:3306/airflow
> # The SqlAlchemy pool size is the maximum number of database connections
> # in the pool.
> sql_alchemy_pool_size = 128
> # The SqlAlchemy pool recycle is the number of seconds a connection
> # can be idle in the pool before it is invalidated. This config does
> # not apply to sqlite.
> sql_alchemy_pool_recycle = 600
> # The amount of parallelism as a setting to the executor. This defines
> # the max number of task instances that should run simultaneously
> # on this airflow installation
> parallelism = 4
> # The number of task instances allowed to run concurrently by the scheduler
> dag_concurrency = 4
> # Are DAGs paused by default at creation
> dags_are_paused_at_creation = True
> # When not using pools, tasks are run in the "default pool",
> # whose size is guided by this config element
> non_pooled_task_slot_count = 128
> # The maximum number of active DAG runs per DAG
> max_active_runs_per_dag = 4
> # Whether to load the examples that ship with Airflow. It's good to
> # get started, but you probably want to set this to False in a production
> # environment
> load_examples = False
> # Where your Airflow plugins are stored
> plugins_folder = /home/airflow/airflow/plugins
> # Secret key to save connection passwords in the db
> fernet_key = xxx
> # Whether to disable pickling dags
> donot_pickle = False
> # How long before timing out a python file import while filling the DagBag
> dagbag_import_timeout = 30
> # The class to use for running task instances in a subprocess
> task_runner = StandardTaskRunner
> # If set, tasks without a `run_as_user` argument will be run with this user
> # Can be used to de-elevate a sudo user running Airflow when executing tasks
> default_impersonation =
> # What security module to use (for example kerberos):
> security =
> # Turn unit test mode on (overwrites many configuration options with test
> # values at runtime)
> unit_test_mode = False
> [cli]
> # In what way should the cli access the API. The LocalClient will use the
> # database directly, while the json_client will use the api running on the
> # webserver
> api_client = airflow.api.client.local_client
> endpoint_url = http://localhost:8080
> [api]
> # How to authenticate users of the API
> auth_backend = airflow.api.auth.backend.deny_all
> [operators]
> # The default owner assigned to each new operator, unless
> # provided explicitly or passed via `default_args`
> default_owner = Airflow
> default_cpus = 1
> default_ram = 512
> default_disk = 512
> default_gpus = 0
> [webserver]
> # The base url of your website as airflow cannot guess what domain or
> # cname you are using. This is used in automated emails that
> # airflow sends to point links to the right web server
> base_url = http://airflow-1.example.com:8080
> # The ip specified when starting the web server
> web_server_host = 0.0.0.0
> # The port on which to run the web server
> web_server_port = 8080
> # Paths to the SSL certificate and key for the web server. When both are
> # provided SSL will be enabled. This does not change the web server port.
> #web_server_ssl_cert = /etc/ssl/certs/example.com.crt
> #web_server_ssl_key = /etc/ssl//etc/ssl/private/example.com.key
> # Number of seconds the gunicorn webserver waits before timing out on a worker
> web_server_worker_timeout = 600
> web_server_master_timeout = 600
> # Number of workers to refresh at a time. When set to 0, worker refresh is
> # disabled. When nonzero, airflow periodically refreshes webserver workers by
> # bringing up new ones and killing old ones.
> worker_refresh_batch_size = 0
> # Number of seconds to wait before refreshing a batch of workers.
> worker_refresh_interval = 30
> # Secret key used to run your flask app
> secret_key = xxx
> # Number of workers to run the Gunicorn web server
> workers = 4
> # The worker class gunicorn should use. Choices include
> # sync (default), eventlet, gevent
> worker_class = sync
> # Log files for the gunicorn webserver. '-' means log to stderr.
> access_logfile = -
> error_logfile = -
> rbac = True
> # Expose the configuration file in the web server
> expose_config = False
> # Set to true to turn on authentication:
> # http://pythonhosted.org/airflow/security.html
> #web-authentication
> authenticate = True
> auth_backend = airflow.contrib.auth.backends.password_auth
> # Filter the list of dags by owner name (requires authentication to be enabled)
> filter_by_owner = False
> # Filtering mode. Choices include user (default) and ldapgroup.
> # Ldap group filtering requires using the ldap backend
> #
> # Note that the ldap server needs the "memberOf" overlay to be set up
> # in order to user the ldapgroup mode.
> owner_mode = user
> # Default DAG orientation. Valid values are:
> # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
> dag_orientation = LR
> # Puts the webserver in demonstration mode; blurs the names of Operators for
> # privacy.
> demo_mode = False
> # The amount of time (in secs) webserver will wait for initial handshake
> # while fetching logs from other worker machine
> log_fetch_timeout_sec = 5
> # By default, the webserver shows paused DAGs. Flip this to hide paused
> # DAGs by default
> hide_paused_dags_by_default = False
> [email]
> email_backend = airflow.utils.email.send_email_smtp[smtp]
> # If you want airflow to send emails on retries, failure, and you want to use
> # the airflow.utils.email.send_email_smtp function, you have to configure an
> # smtp server here
> smtp_host = localhost
> smtp_starttls = True
> smtp_ssl = False
> # Uncomment and set the user/pass settings if you want to use SMTP AUTH
> smtp_user = airflow
> smtp_password = xxx
> smtp_port = 587
> smtp_mail_from = airflow@airflow-1.example.com
> [schedduler]
> # Task instances listen for external kill signal (when you clear tasks
> # from the CLI or the UI), this defines the frequency at which they should
> # listen (in seconds).
> job_heartbeat_sec = 5
> # The scheduler constantly tries to trigger new tasks (look at the
> # scheduler section in the docs for more information). This defines
> # how often the scheduler should run (in seconds).
> scheduler_heartbeat_sec = 5
> # after how much time should the scheduler terminate in seconds
> # -1 indicates to run continuously (see also num_runs)
> run_duration = -1
> # after how much time a new DAGs should be picked up from the filesystem
> min_file_process_interval = 0
> dag_dir_list_interval = 300
> # How often should stats be printed to the logs
> print_stats_interval = 30
> child_process_log_directory = /var/log/airflow/scheduler
> # Local task jobs periodically heartbeat to the DB. If the job has
> # not heartbeat in this many seconds, the scheduler will mark the
> # associated task instance as failed and will re-schedule the task.
> scheduler_zombie_task_threshold = 300
> # Turn off scheduler catchup by setting this to False.
> # Default behavior is unchanged and
> # Command Line Backfills still work, but the scheduler
> # will not do scheduler catchup if this is False,
> # however it can be set on a per DAG basis in the
> # DAG definition (catchup)
> catchup_by_default = True
> # The scheduler can run multiple threads in parallel to schedule dags.
> # This defines how many threads will run. However airflow will never
> # use more threads than the amount of cpu cores available.
> max_threads = 2
> authenticate = False
> [admin]
> # UI to hide sensitive variable fields when set to True
> hide_sensitive_variable_fields = True
> {code}
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)