You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/13 13:19:38 UTC

[GitHub] [airflow-client-python] Dr0p42 commented on issue #29: Get Dags call is not working.

Dr0p42 commented on issue #29:
URL: https://github.com/apache/airflow-client-python/issues/29#issuecomment-918182699


   👋 hello everyone, I have the same issue with the following:
   
   _docker-compose.yaml_
   ```yaml
   version: "3.9"
   
   x-airflow-common:
     &airflow-common
     # In order to add custom dependencies or upgrade provider packages you can use your extended image.
     # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
     # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
     image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3}
     # build: .
     environment:
       &airflow-common-env
       AIRFLOW__CORE__EXECUTOR: CeleryExecutor
       AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
       AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
       AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
       AIRFLOW__CORE__FERNET_KEY: ''
       AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
       AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
       AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
       _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
     volumes:
       - ./dags:/opt/airflow/dags
       - ./logs:/opt/airflow/logs
       - ./plugins:/opt/airflow/plugins
     user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
     depends_on:
       redis:
         condition: service_healthy
       postgres:
         condition: service_healthy  
   
   services:
     postgres:
       image: postgres:13
       environment:
         POSTGRES_USER: airflow
         POSTGRES_PASSWORD: airflow
         POSTGRES_DB: airflow
       volumes:
         - postgres-db-volume:/var/lib/postgresql/data
       healthcheck:
         test: ["CMD", "pg_isready", "-U", "airflow"]
         interval: 5s
         retries: 5
       restart: always
   
     redis:
       image: redis:latest
       ports:
         - 6379:6379
       healthcheck:
         test: ["CMD", "redis-cli", "ping"]
         interval: 5s
         timeout: 30s
         retries: 50
       restart: always
   
     airflow-webserver:
       <<: *airflow-common
       command: webserver
       ports:
         - 8080:8080
       healthcheck:
         test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
         interval: 10s
         timeout: 10s
         retries: 5
       restart: always
   
     airflow-scheduler:
       <<: *airflow-common
       command: scheduler
       healthcheck:
         test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
         interval: 10s
         timeout: 10s
         retries: 5
       restart: always
   
     airflow-worker:
       <<: *airflow-common
       command: celery worker
       healthcheck:
         test:
           - "CMD-SHELL"
           - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
         interval: 10s
         timeout: 10s
         retries: 5
       restart: always
   
     airflow-init:
       <<: *airflow-common
       entrypoint: /bin/bash
       command:
         - -c
         - |
           function ver() {
             printf "%04d%04d%04d%04d" $${1//./ }
           }
           airflow_version=$$(gosu airflow airflow version)
           airflow_version_comparable=$$(ver $${airflow_version})
           min_airflow_version=2.1.0
           min_airlfow_version_comparable=$$(ver $${min_airflow_version})
           if (( airflow_version_comparable < min_airlfow_version_comparable )); then
             echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
             echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
             exit 1
           fi
           if [[ -z "${AIRFLOW_UID}" ]]; then
             echo -e "\033[1;31mERROR!!!: AIRFLOW_UID not set!\e[0m"
             echo "Please follow these instructions to set AIRFLOW_UID and AIRFLOW_GID environment variables:
               https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#initializing-environment"
             exit 1
           fi
           one_meg=1048576
           mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
           cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
           disk_available=$$(df / | tail -1 | awk '{print $$4}')
           warning_resources="false"
           if (( mem_available < 4000 )) ; then
             echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
             echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
             warning_resources="true"
           fi
           if (( cpus_available < 2 )); then
             echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
             echo "At least 2 CPUs recommended. You have $${cpus_available}"
             warning_resources="true"
           fi
           if (( disk_available < one_meg * 10 )); then
             echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
             echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
             warning_resources="true"
           fi
           if [[ $${warning_resources} == "true" ]]; then
             echo
             echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
             echo "Please follow the instructions to increase amount of resources available:"
             echo "   https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
           fi
           mkdir -p /sources/logs /sources/dags /sources/plugins
           chown -R "${AIRFLOW_UID}:${AIRFLOW_GID}" /sources/{logs,dags,plugins}
           exec /entrypoint airflow version
       environment:
         <<: *airflow-common-env
         _AIRFLOW_DB_UPGRADE: 'true'
         _AIRFLOW_WWW_USER_CREATE: 'true'
         _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
         _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
       user: "0:${AIRFLOW_GID:-0}"
       volumes:
         - .:/sources
   
     flower:
       <<: *airflow-common
       command: celery flower
       ports:
         - 5555:5555
       healthcheck:
         test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
         interval: 10s
         timeout: 10s
         retries: 5
       restart: always
   
   volumes:
     postgres-db-volume:
   
   ```
   
   _./dags/tutorial.py_
   ```python
   
   from datetime import timedelta
   from textwrap import dedent
   
   # The DAG object; we'll need this to instantiate a DAG
   from airflow import DAG
   
   # Operators; we need this to operate!
   from airflow.operators.bash import BashOperator
   from airflow.utils.dates import days_ago
   # These args will get passed on to each operator
   # You can override them on a per-task basis during operator initialization
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5),
       # 'queue': 'bash_queue',
       # 'pool': 'backfill',
       # 'priority_weight': 10,
       # 'end_date': datetime(2016, 1, 1),
       # 'wait_for_downstream': False,
       # 'dag': dag,
       # 'sla': timedelta(hours=2),
       # 'execution_timeout': timedelta(seconds=300),
       # 'on_failure_callback': some_function,
       # 'on_success_callback': some_other_function,
       # 'on_retry_callback': another_function,
       # 'sla_miss_callback': yet_another_function,
       # 'trigger_rule': 'all_success'
   }
   with DAG(
       'tutorial',
       default_args=default_args,
       description='A simple tutorial DAG',
       schedule_interval=timedelta(days=1),
       start_date=days_ago(2),
       tags=['example'],
   ) as dag:
   
       # t1, t2 and t3 are examples of tasks created by instantiating operators
       t1 = BashOperator(
           task_id='print_date',
           bash_command='date',
       )
   
       t2 = BashOperator(
           task_id='sleep',
           depends_on_past=False,
           bash_command='sleep 5',
           retries=3,
       )
       t1.doc_md = dedent(
           """\
       #### Task Documentation
       You can document your task using the attributes `doc_md` (markdown),
       `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
       rendered in the UI's Task Instance Details page.
       ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
   
       """
       )
   
       dag.doc_md = __doc__  # providing that you have a docstring at the beggining of the DAG
       dag.doc_md = """
       This is a documentation placed anywhere
       """  # otherwise, type it like this
       templated_command = dedent(
           """
       {% for i in range(5) %}
           echo "{{ ds }}"
           echo "{{ macros.ds_add(ds, 7)}}"
           echo "{{ params.my_param }}"
       {% endfor %}
       """
       )
   
       t3 = BashOperator(
           task_id='templated',
           depends_on_past=False,
           bash_command=templated_command,
           params={'my_param': 'Parameter I passed in'},
       )
   
       t1 >> [t2, t3]
   ```
   
   The code I am running (inside a Jupyter Notebook running in JupyerLab)
   _notebook_cell_
   ```python
   import time
   import airflow_client.client as client
   from airflow_client.client.api import dag_api
   from airflow_client.client.model.dag_collection import DAGCollection
   from airflow_client.client.model.error import Error
   from pprint import pprint
   
   # The client must configure the authentication and authorization parameters
   # in accordance with the API server security policy.
   # Examples for each auth method are provided below, use the example that
   # satisfies your auth use case.
   
   # Configure HTTP basic authorization: Basic
   configuration = client.Configuration(
       username = 'airflow',
       password = 'airflow',
       host = "http://airflow-webserver:8080/api/v1"
   )
   
   # Enter a context with an instance of the API client
   with client.ApiClient(configuration) as api_client:
       # Create an instance of the API class
       api_instance = dag_api.DAGApi(api_client)
       limit = 100 # int | The numbers of items to return. (optional) if omitted the server will use the default value of 100
       offset = 0 # int | The number of items to skip before starting to collect the result set. (optional)
       # example passing only required values which don't have defaults set
       # and optional values
       try:
           # List DAGs
           api_response = api_instance.get_dags(limit=limit, offset=offset, order_by=order_by)
           pprint(api_response)
       except client.ApiException as e:
           print("Exception when calling DAGApi->get_dags: %s\n" % e)
   ```
   
   The  error I am getting:
   ```
   ...
   /opt/conda/lib/python3.8/site-packages/airflow_client/client/model_utils.py in get_allof_instances(self, model_args, constant_args)
      1628             composed_instances.append(allof_instance)
      1629         except Exception as ex:
   -> 1630             raise ApiValueError(
      1631                 "Invalid inputs given to generate an instance of '%s'. The "
      1632                 "input data was invalid for the allOf schema '%s' in the composed "
   
   ApiValueError: Invalid inputs given to generate an instance of 'DAGCollectionAllOf'. The input data was invalid for the allOf schema 'DAGCollectionAllOf' in the composed schema 'DAGCollection'. Error=DAG has no attribute 'is_active' at ['received_data']['dags'][0]['is_active']
   ```
   
   The DAG is running fine by looking at the UI:
   <img width="1920" alt="image" src="https://user-images.githubusercontent.com/22365519/133090458-abfa3ac4-5b29-4bc4-aa99-66c5da100a6c.png">
   
   > Dag doesn’t have is_active attribute
   When you query a DAG from the API it does.
   
   _Postman screenshot_
   <img width="1593" alt="image" src="https://user-images.githubusercontent.com/22365519/133090572-b6d3e126-e505-4dfd-b664-e8b5f4e2e94d.png">
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org