You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/30 10:49:53 UTC

[GitHub] [airflow] Pravka opened a new issue #15613: Airflow2.0.2

Pravka opened a new issue #15613:
URL: https://github.com/apache/airflow/issues/15613


   Hi, 
   
   I am experiencing issues with reading logs from Elasticsearch, not sure if it's a bug or my incompetence!
   
   **Apache Airflow version**: 2.0.2
   **Elastic version**: v 7.9.3
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`): v1.19.6
   **Environment**: Dev Kubernetes
   
   - **Cloud provider or hardware configuration**: AWS 
   - **OS**: Debian GNU/Linux 10 (buster) 
   - **Kernel**: Linux airflow-6d7d4568c-w7plk 4.14.138-rancher #1 SMP Sat Aug 10 11:25:46 UTC 2019 x86_64 GNU/Linux
   - **Install tools**: Kubernetes manifest files -- using Airflow docker image apache/airflow:2.0.2-python3.8-build
   
   **What happened**:
   I am running Airflow with Celery Executor inside Kubernetes cluster which runs Spark jobs via KubernetesPodOperator. I have 2 pods:
   
   ```
   NAME                                                   READY   STATUS             RESTARTS   AGE
   airflow-6d7d4568c-w7plk                                4/4     Running            0          18h
   airflow-worker-5597c8cc8-nlpv9                         2/2     Running            0          18h
   ```
   Airflow pod consists of airflow-ui, airflow-scheduler, airflow-flower and aws-s3-sync container used to sync DAGs from S3.
   Airflow-worker pod consists of airflow-celery-worker and aws-s3-sync containers
   
   For now, I am trying to execute a DAG which runs spark-submit --version using KubernetesPodOperator. DAG executes and logs are present in container stdout. 
   
   I use Filebeat to pick up the logs and enrich them with "add_cloud_metadata" and "add_host_metadata". Afterwards, logs are sent to Logstash for field adjustments as Airflow writes logs to Elasticsearch in one format and tries to read them in other format. This particularly applies for execution_date field. Anyhow, logs are visible in Kibana so I have parsed the fields and assembled log_id field so that Airflow can read it which I confirmed by running a query in console in Kibana. 
   
   Follow up on execution_date field. Seems like when Airflow writes logs to Elasticsearch while running in Kubernetes, fields won't be written to elasticsearch as dag_id, log_id, execution_date and try_number but rather, [kubernetes][labels][dag_id], etc etc. So, if I assemble log_id field manually, using [kubernetes][labels]* fields it turns out example field looks like this:
   ```log_id        spark-submit-spark-submit-2021-04-28T110330.1402290000-3c11bfafa-1```
   which is by default incorrect because, while reading logs, Airflow tries to fetch:
   ```log_id       spark-submit-spark-submit-2021-04-28T11:03:30.140229+00:00-1```
   I am not sure whether this here is something that needs improving or is it expected. IMO, it should not be expected as due to vague documentation with no extensive explanations on what really happens, users have to invest hours in getting to the bottom of the issue and working out a solution on their own. 
   
   After parsing execution_date to be the same as what Airflow tries to fetch, I had to enable fileddata on offset field in elasticsearch as Airflow couldn't sort offsets. After that, the error I sent above happened. 
   
   
   By following Airflow logs while trying to read the log from elasticsearch, below errors pops up:
   ```
   [2021-04-30 09:47:23,421] {app.py:1891} ERROR - Exception on /get_logs_with_metadata [GET]
   Traceback (most recent call last):
     File "/root/.local/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
       response = self.full_dispatch_request()
     File "/root/.local/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
       rv = self.handle_user_exception(e)
     File "/root/.local/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
       reraise(exc_type, exc_value, tb)
     File "/root/.local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
       raise value
     File "/root/.local/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
       rv = self.dispatch_request()
     File "/root/.local/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
       return self.view_functions[rule.endpoint](**req.view_args)
     File "/root/.local/lib/python3.8/site-packages/airflow/www/auth.py", line 34, in decorated
       return func(*args, **kwargs)
     File "/root/.local/lib/python3.8/site-packages/airflow/www/decorators.py", line 60, in wrapper
       return f(*args, **kwargs)
     File "/root/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File "/root/.local/lib/python3.8/site-packages/airflow/www/views.py", line 1068, in get_logs_with_metadata
       logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
     File "/root/.local/lib/python3.8/site-packages/airflow/utils/log/log_reader.py", line 58, in read_log_chunks
       logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
     File "/root/.local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 217, in read
       log, metadata = self._read(task_instance, try_number_element, metadata)
     File "/root/.local/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 161, in _read
       logs_by_host = self._group_logs_by_host(logs)
     File "/root/.local/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 130, in _group_logs_by_host
       grouped_logs[key].append(log)
   TypeError: unhashable type: 'AttrDict'
   ```
   
   **What you expected to happen**: Airflow UI to display task logs in UI
   
   **How to reproduce it**:
   Spin kubernetes cluster, deploy Airflow with CeleryExecutor in it, use filebeat to pick up logs, send through logstash to elasticsearch. Run any job using KubernetesPodOperator and try to check task logs in Airflow UI. UI task logs view should spin until timeout, then display blank page.
   
   
   **Relevant information/configuration settings**:
   
   **airflow.cfg**:
   ```
   AIRFLOW__ELASTICSEARCH__END_OF_LOG_MARK: end_of_log
   AIRFLOW__ELASTICSEARCH__FRONTEND: elastic:pass@***.***.svc.cluster.local:443/{log_id}
   AIRFLOW__ELASTICSEARCH__HOST: elastic:pass@***.***.svc.cluster.local:9200
   AIRFLOW__ELASTICSEARCH__JSON_FIELDS: asctime, filename, lineno, levelname, message
   AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "True"
   AIRFLOW__ELASTICSEARCH__LOG_ID_TEMPLATE: '{dag_id}-{task_id}-{execution_date}-{try_number}'
   AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "True"
   AIRFLOW__ELASTICSEARCH_CONFIGS__CA_CERTS: /opt/certs/ca.crt
   AIRFLOW__ELASTICSEARCH_CONFIGS__USE_SSL: "True"
   AIRFLOW__ELASTICSEARCH_CONFIGS__VERIFY_CERTS: "True"
   AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
   ```
   **filebeat.yml**:
   ```
       filebeat.autodiscover:
         providers:
           - type: kubernetes
             node: ${NODE_NAME}
             hints.enabled: true
             hints.default_config:
               type: container
               paths:
                 - /var/log/containers/*${data.kubernetes.container.id}.log
       processors:
         - add_cloud_metadata:
         - add_host_metadata:
       
       output.logstash:
         hosts: '***.***.svc.cluster.local:5044'
   ```
   **logstash.conf**:
   ```
       input {
         beats {
           port => 5044
         }
       }
       filter {
         if [kubernetes][labels][dag_id] and [kubernetes][labels][task_id] and [kubernetes][labels][execution_date] and [kubernetes][labels][try_number] {
           mutate {
             gsub => [
               "[kubernetes][labels][execution_date]", "^([0-9a-z][0-9a-z][0-9a-z][0-9a-z]-[0-9a-z][0-9a-z]-[0-9a-z][0-9a-z]T)([0-9a-z][0-9a-z])([0-9a-z][0-9a-z])([0-9a-z][0-9a-z])(.[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z])*([0-9a-z][0-9a-z])([0-9a-z][0-9a-z])(?:-[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z])?", "\1\2:\3:\4\5+\6:\7"
             ]
             add_field => {
               "offset" => "%{[log][offset]}"
               "log_id" => "%{[kubernetes][labels][dag_id]}-%{[kubernetes][labels][task_id]}-%{[kubernetes][labels][execution_date]}-%{[kubernetes][labels][try_number]}"
             }
           }
         }
       }
       output {
         elasticsearch {
           index => "logstash-%{[@metadata][beat]}"
           hosts => [ "https://***.***.svc.cluster.local:9200" ]
           user => "elastic"
           password => "${ES_PASSWORD}"
           cacert => '/etc/logstash/certificates/ca.crt'
         }
       }
   ```
   
   **Final thoughts**:
   Not sure whether I have missed something while setting the thing up following https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/logging.html or Airflow crew needs to work on improving reading logs from elasticsearch. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on issue #15613: Airflow2.0.2 --- TypeError: unhashable type: 'AttrDict' while trying to read logs from elasticsearch

Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #15613:
URL: https://github.com/apache/airflow/issues/15613#issuecomment-1003101681


   @jedcunningham Any ideas here -- I know you worked with logstash & ES a bit on a couple of PRs, does it ring a bell?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on issue #15613: Airflow2.0.2 --- TypeError: unhashable type: 'AttrDict' while trying to read logs from elasticsearch

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #15613:
URL: https://github.com/apache/airflow/issues/15613#issuecomment-1067274398


   There is also pending PR to improve elastic search logging https://github.com/apache/airflow/pull/21942
   @millin can you confirm if it will also address this issue? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] millin commented on issue #15613: Airflow2.0.2 --- TypeError: unhashable type: 'AttrDict' while trying to read logs from elasticsearch

Posted by GitBox <gi...@apache.org>.
millin commented on issue #15613:
URL: https://github.com/apache/airflow/issues/15613#issuecomment-1068876041


   Seems like I've seen this error about a year ago, but it didn't appear in newer versions.
   @Pravka what versions of Python packages `apache-airflow-providers-elasticsearch`, `elasticsearch`, `elasticsearch-dsl` are installed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on issue #15613: Airflow2.0.2 --- TypeError: unhashable type: 'AttrDict' while trying to read logs from elasticsearch

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #15613:
URL: https://github.com/apache/airflow/issues/15613#issuecomment-1067225949


   @Pravka does the issue still happens in latest airflow version?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #15613: Airflow2.0.2

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #15613:
URL: https://github.com/apache/airflow/issues/15613#issuecomment-830011420


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org