You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Bhavani Ramasamy <vs...@gmail.com> on 2018/09/17 00:56:51 UTC

S3 logs not working with CeleryExecutor

Hello Team,
I am trying to setup S3 logging with CeleryExecutor (apache-airflow 1.10).
Files are not written to S3. I have configured in airflow.cfg like below,

remote_logging = True

remote_log_conn_id = s3_connection_mine

remote_base_log_folder = s3://mybucket/airflow/logs/


when i tried with LocalExecutor, it works perfectly fine but not with
CeleryExecutor. Also CeleryExecutor is working fine without S3 logs. But i
want the logs to be in S3. I can see the celery logging is defaulted to log
= LoggingMixin().log inside default_celery.py. Can you please help me to
configure S3 logging with CeleryExecutor.


Thanks,

Bhavani

Re: S3 logs not working with CeleryExecutor

Posted by vs...@gmail.com, vs...@gmail.com.

On 2018/09/17 14:12:19, Kyle Hamlin <ha...@gmail.com> wrote: 
> So you need to set the following in your config:
> 
> logging_level=INFO
> remote_logging=True
> logging_config_class=log_config.LOGGING_CONFIG
> task_log_reader=s3.task
> remote_log_conn_id=some_conn
> remote_base_log_folder=s3://some_bucket/foo
> 
> Then create a folder called config next to the dags folder and put a file
> in there called log_config.py. This will give you remote logging for
> tasks.  The scheduler (processor) logs will go to stdout.
> 
> import os
> 
> from airflow import configuration as conf
> 
> LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
> LOG_FORMAT = conf.get('core', 'log_format')
> 
> BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
> PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
> 
> FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{
> try_number }}.log'
> PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
> 
> S3_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
> 
> LOGGING_CONFIG = {
> 'version': 1,
> 'disable_existing_loggers': False,
> 'formatters': {
> 'airflow.task': {
> 'format': LOG_FORMAT,
> },
> 'airflow.processor': {
> 'format': LOG_FORMAT,
> },
> },
> 'handlers': {
> 'console': {
> 'class': 'logging.StreamHandler',
> 'formatter': 'airflow.task',
> 'stream': 'ext://sys.stdout'
> },
> 'file.task': {
> 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
> 'formatter': 'airflow.task',
> 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
> 'filename_template': FILENAME_TEMPLATE,
> },
> 'file.processor': {
> 'class':
> 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
> 'formatter':
> 'airflow.processor',
> 'base_log_folder':
> os.path.expanduser(PROCESSOR_LOG_FOLDER),
> 'filename_template':
> PROCESSOR_FILENAME_TEMPLATE,
> },
> # When using s3 or gcs, provide a customized LOGGING_CONFIG
> # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
> # for details
> 's3.task': {
> 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
> 'formatter': 'airflow.task',
> 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
> 's3_log_folder': S3_LOG_FOLDER,
> 'filename_template': FILENAME_TEMPLATE,
> },
> },
> 'loggers': {
> '': {
> 'handlers': ['console'],
> 'level': LOG_LEVEL
> },
> 'airflow': {
> 'handlers': ['console'],
> 'level': LOG_LEVEL,
> 'propagate': False,
> },
> 'airflow.processor': {
> 'handlers':
> ['file.processor'], # MUST CHANGE TO CORRESPONDING HANDLER
> 'level': LOG_LEVEL,
> 'propagate': True,
> },
> 'airflow.task': {
> 'handlers': ['s3.task'], # MUST CHANGE TO CORRESPONDING HANDLER
> 'level': LOG_LEVEL,
> 'propagate': False,
> },
> 'airflow.task_runner': {
> 'handlers': ['s3.task'], # MUST CHANGE TO CORRESPONDING HANDLER
> 'level': LOG_LEVEL,
> 'propagate': True,
> },
> }
> }
> 
> On Sun, Sep 16, 2018 at 8:57 PM Bhavani Ramasamy <vs...@gmail.com>
> wrote:
> 
> > Hello Team,
> > I am trying to setup S3 logging with CeleryExecutor (apache-airflow 1.10).
> > Files are not written to S3. I have configured in airflow.cfg like below,
> >
> > remote_logging = True
> >
> > remote_log_conn_id = s3_connection_mine
> >
> > remote_base_log_folder = s3://mybucket/airflow/logs/
> >
> >
> > when i tried with LocalExecutor, it works perfectly fine but not with
> > CeleryExecutor. Also CeleryExecutor is working fine without S3 logs. But i
> > want the logs to be in S3. I can see the celery logging is defaulted to log
> > = LoggingMixin().log inside default_celery.py. Can you please help me to
> > configure S3 logging with CeleryExecutor.
> >
> >
> > Thanks,
> >
> > Bhavani
> >
> 
> 
> -- 
> Kyle Hamlin
> Hello Kyle,

I have tried with all the above mentioned steps. But didnt help. 
airflow.processor': {
> 'handlers':
> ['file.processor'], # MUST CHANGE TO CORRESPONDING HANDLER
> 'level': LOG_LEVEL,
> 'propagate': True,
> },
Should I change this file.processor to s3.processor by adding the below code?
        's3.processor': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },

But this throws an error: ValueError: Unable to configure handler 's3.processor': Unable to set formatter 'airflow': 'airflow'
Please advise.
Thanks,
Bhavani


Re: S3 logs not working with CeleryExecutor

Posted by Kyle Hamlin <ha...@gmail.com>.
So you need to set the following in your config:

logging_level=INFO
remote_logging=True
logging_config_class=log_config.LOGGING_CONFIG
task_log_reader=s3.task
remote_log_conn_id=some_conn
remote_base_log_folder=s3://some_bucket/foo

Then create a folder called config next to the dags folder and put a file
in there called log_config.py. This will give you remote logging for
tasks.  The scheduler (processor) logs will go to stdout.

import os

from airflow import configuration as conf

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{
try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

S3_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class':
'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter':
'airflow.processor',
'base_log_folder':
os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template':
PROCESSOR_FILENAME_TEMPLATE,
},
# When using s3 or gcs, provide a customized LOGGING_CONFIG
# in airflow_local_settings within your PYTHONPATH, see UPDATING.md
# for details
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers':
['file.processor'], # MUST CHANGE TO CORRESPONDING HANDLER
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['s3.task'], # MUST CHANGE TO CORRESPONDING HANDLER
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['s3.task'], # MUST CHANGE TO CORRESPONDING HANDLER
'level': LOG_LEVEL,
'propagate': True,
},
}
}

On Sun, Sep 16, 2018 at 8:57 PM Bhavani Ramasamy <vs...@gmail.com>
wrote:

> Hello Team,
> I am trying to setup S3 logging with CeleryExecutor (apache-airflow 1.10).
> Files are not written to S3. I have configured in airflow.cfg like below,
>
> remote_logging = True
>
> remote_log_conn_id = s3_connection_mine
>
> remote_base_log_folder = s3://mybucket/airflow/logs/
>
>
> when i tried with LocalExecutor, it works perfectly fine but not with
> CeleryExecutor. Also CeleryExecutor is working fine without S3 logs. But i
> want the logs to be in S3. I can see the celery logging is defaulted to log
> = LoggingMixin().log inside default_celery.py. Can you please help me to
> configure S3 logging with CeleryExecutor.
>
>
> Thanks,
>
> Bhavani
>


-- 
Kyle Hamlin