You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/09 14:25:00 UTC

[GitHub] [airflow] lfreina opened a new issue #15305: KubernetesExecutor Airflow 2.0.0 different AirflowExceptions and tasks not getting scheduled

lfreina opened a new issue #15305:
URL: https://github.com/apache/airflow/issues/15305


   
   **Apache Airflow version**: 
   ``
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`): 
   ```kubectl version
   Client Version: version.Info{Major:"1", Minor:"20", GitVersion:"v1.20.5", GitCommit:"6b1d87acf3c8253c123756b9e61dac642678305f", GitTreeState:"clean", BuildDate:"2021-03-31T15:33:39Z", GoVersion:"go1.15.10", Compiler:"gc", Platform:"linux/amd64"}
   Server Version: version.Info{Major:"1", Minor:"19", GitVersion:"v1.19.0", GitCommit:"e19964183377d0ec2052d1f1fa930c4d7575bd50", GitTreeState:"clean", BuildDate:"2020-08-26T14:23:04Z", GoVersion:"go1.15", Compiler:"gc", Platform:"linux/amd64"}
   ```
   
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: AWS
   - **Others**:  Building my own docker image with the official documentation with Python 3.6
   
   ```
   ARG PYTHON_VERSION=3.6
   FROM python:${PYTHON_VERSION}-slim
   
   # install deps
   RUN apt-get update -y && apt-get install -y \
       libczmq-dev \
       libssl-dev \
       inetutils-telnet \
       bind9utils \
       gcc \
       build-essential \
       && apt-get clean
   
   RUN pip install --upgrade pip
   
   RUN pip install "apache-airflow==2.0.0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.0.0/constraints-3.6.txt"
   
   RUN pip install	psycopg2-binary \
                   apache-airflow-providers-cncf-kubernetes
   
   # host specific configuration used in airflow config file
   COPY airflow_local_settings.py /usr/local/lib/python3.6/airflow_local_settings.py
   
   COPY airflow-test-env-init.sh /tmp/airflow-test-env-init.sh
   
   COPY bootstrap.sh /bootstrap.sh
   RUN chmod +x /bootstrap.sh
   ENTRYPOINT ["/bootstrap.sh"]
   ```
   
   **What happened**:
   
   The tasks are not getting scheduled. In order to try the scheduler I did a simple DummyOperator DAG:
   
   ```from datetime import datetime
   from airflow import DAG
   from airflow.operators.dummy import DummyOperator
   
   
   def print_hello():
       return 'Hello world!'
   
   
   def print_goodbye():
       return 'Goodbye world'
   
   
   dag = DAG('hello_world', description='Simple tutorial DAG',
             schedule_interval='0 12 * * *',
             start_date=datetime(2017, 3, 20), catchup=False)
   
   dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
   
   dummy_operator2 = DummyOperator(task_id='dummy_task2', retries=3, dag=dag)
   
   dummy_operator >> dummy_operator2
   ```
   
   I tested the following getting different results:
   
   1. **airflow tasks test hello_world dummy_task 2021-01-01** --> No error at all
   
   2. **airflow tasks run hello_world dummy_task 2021-01-01**
   
   ```
   root@airflow-d78b6b585-wfl2q:/# airflow tasks run hello_world dummy_task 2021-01-01
   [2021-04-08 09:15:44,383] {dagbag.py:440} INFO - Filling up the DagBag from /root/airflow/dags
   Running <TaskInstance: hello_world.dummy_task 2021-01-01T00:00:00+00:00 [success]> on host 192.168.4.96
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/local/lib/python3.6/site-packages/airflow/__main__.py", line 40, in main
       args.func(args)
     File "/usr/local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 89, in wrapper
       return f(*args, **kwargs)
     File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 234, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 68, in _run_task_by_selected_method
       _run_task_by_executor(args, dag, ti)
     File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 91, in _run_task_by_executor
       executor.start()
     File "/usr/local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 493, in start
       raise AirflowException("Could not get scheduler_job_id")
   airflow.exceptions.AirflowException: Could not get scheduler_job_id
   ```
   
   3. **airflow tasks run --raw hello_world dummy_task 2021-01-01**
   
   ```
   root@airflow-d78b6b585-wfl2q:/# airflow tasks run --raw hello_world dummy_task 2021-01-01
   [2021-04-08 09:17:52,261] {dagbag.py:440} INFO - Filling up the DagBag from /root/airflow/dags
   Running <TaskInstance: hello_world.dummy_task 2021-01-01T00:00:00+00:00 [success]> on host 192.168.4.96
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/local/lib/python3.6/site-packages/airflow/__main__.py", line 40, in main
       args.func(args)
     File "/usr/local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 89, in wrapper
       return f(*args, **kwargs)
     File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 234, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 66, in _run_task_by_selected_method
       _run_raw_task(args, ti)
     File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 151, in _run_raw_task
       pool=args.pool,
     File "/usr/local/lib/python3.6/site-packages/airflow/utils/session.py", line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1152, in _run_raw_task
       self._run_mini_scheduler_on_child_tasks(session)
     File "/usr/local/lib/python3.6/site-packages/airflow/utils/session.py", line 62, in wrapper
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1165, in _run_mini_scheduler_on_child_tasks
       execution_date=self.execution_date,
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3500, in one
       raise orm_exc.NoResultFound("No row was found for one()")
   sqlalchemy.orm.exc.NoResultFound: No row was found for one()
   ```
   
   
   **What you expected to happen**: 
   
   The tasks should run completely. Just the first one is marked as successful and the next task is never added to the scheduler.
   
   No idea what went wrong. I already patch the code with: https://github.com/apache/airflow/pull/14160/files which I found from another similar issue.
   
   **How to reproduce it**:
   
   Use minikube and use the following setup:
   
   <details><summary>airflow.yaml</summary>
   #  Licensed to the Apache Software Foundation (ASF) under one   *
   #  or more contributor license agreements.  See the NOTICE file *
   #  distributed with this work for additional information        *
   #  regarding copyright ownership.  The ASF licenses this file   *
   #  to you under the Apache License, Version 2.0 (the            *
   #  "License"); you may not use this file except in compliance   *
   #  with the License.  You may obtain a copy of the License at   *
   #                                                               *
   #    http://www.apache.org/licenses/LICENSE-2.0                 *
   #                                                               *
   #  Unless required by applicable law or agreed to in writing,   *
   #  software distributed under the License is distributed on an  *
   #  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
   #  KIND, either express or implied.  See the License for the    *
   #  specific language governing permissions and limitations      *
   #  under the License.                                           *
   
   # Note: The airflow image used in this example is obtained by
   # building the image from the local docker subdirectory.
   
   ---
   apiVersion: v1
   kind: ServiceAccount
   metadata:
     name: airflow
     namespace: airflow-example
   imagePullSecrets:
       - name: regcred
   ---
   apiVersion: rbac.authorization.k8s.io/v1
   kind: Role
   metadata:
     namespace: airflow-example
     name: airflow
   rules:
   - apiGroups: [""] # "" indicates the core API group
     resources: ["pods"]
     verbs: ["get", "list", "watch", "create", "update", "delete"]
   - apiGroups: ["batch", "extensions"]
     resources: ["jobs"]
     verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
   
   ---
   apiVersion: rbac.authorization.k8s.io/v1
   kind: ClusterRole
   metadata:
     # "namespace" omitted since ClusterRoles are not namespaced
     name: pod-creator
   rules:
     - apiGroups: [""]
       # The resource to access
       resources: ["pods", "pods/log"]
       verbs: ["get", "watch", "list"]
   ---
   apiVersion: rbac.authorization.k8s.io/v1
   kind: RoleBinding
   metadata:
     name: airflow
     namespace: airflow-example
   subjects:
   - kind: ServiceAccount
     name: airflow # Name of the ServiceAccount
     namespace: airflow-example
   roleRef:
     kind: Role # This must be Role or ClusterRole
     name: airflow # This must match the name of the Role or ClusterRole you wish to bind to
     apiGroup: rbac.authorization.k8s.io
   ---
   apiVersion: rbac.authorization.k8s.io/v1
   kind: ClusterRoleBinding
   metadata:
     name: create-pods
     namespace: airflow-example
   subjects:
     - kind: ServiceAccount
       name: airflow
       namespace: airflow-example
   roleRef:
     kind: ClusterRole
     name: pod-creator
     apiGroup: rbac.authorization.k8s.io
   ---
   apiVersion: apps/v1
   kind: Deployment
   metadata:
     name: airflow
     namespace: airflow-example
   spec:
     replicas: 1
     template:
       metadata:
         labels:
           name: airflow
       spec:
         serviceAccountName: airflow
         initContainers:
         - name: "init"
           image: 817836881741.dkr.ecr.eu-west-1.amazonaws.com/k8s/airflow:2.0.0-gitlab
           imagePullPolicy: Always
           volumeMounts:
           - name: airflow-configmap
             mountPath: /root/airflow/airflow.cfg
             subPath: airflow.cfg
           - name: airflow-dags
             mountPath: /root/airflow/dags
           env:
           - name: SQL_ALCHEMY_CONN
             valueFrom:
               secretKeyRef:
                 name: airflow-secrets
                 key: sql_alchemy_conn
           command:
             - "bash"
           args:
             - "-cx"
             - "./tmp/airflow-test-env-init.sh" 
         containers:
         - name: webserver
           image: 817836881741.dkr.ecr.eu-west-1.amazonaws.com/k8s/airflow:2.0.0-gitlab
           imagePullPolicy: Always
           ports:
           - name: webserver
             containerPort: 8080
           args: ["webserver"]
           env:
           - name: AIRFLOW_KUBE_NAMESPACE
             valueFrom:
               fieldRef:
                 fieldPath: metadata.namespace
           - name: SQL_ALCHEMY_CONN
             valueFrom:
               secretKeyRef:
                 name: airflow-secrets
                 key: sql_alchemy_conn
           volumeMounts:
           - name: airflow-configmap
             mountPath: /root/airflow/airflow.cfg
             subPath: airflow.cfg
           - name: airflow-dags
             mountPath: /root/airflow/dags
           - name: airflow-logs
             mountPath: /root/airflow/logs
         - name: scheduler
           image: 817836881741.dkr.ecr.eu-west-1.amazonaws.com/k8s/airflow:2.0.0-gitlab
           imagePullPolicy: Always
           args: ["scheduler"]
           env:
           - name: AIRFLOW_KUBE_NAMESPACE
             valueFrom:
               fieldRef:
                 fieldPath: metadata.namespace
           - name: SQL_ALCHEMY_CONN
             valueFrom:
               secretKeyRef:
                 name: airflow-secrets
                 key: sql_alchemy_conn
           volumeMounts:
           - name: airflow-configmap
             mountPath: /root/airflow/airflow.cfg
             subPath: airflow.cfg
           - name: airflow-dags
             mountPath: "/root/airflow/dags"
             subPath:
           - name: airflow-logs
             mountPath: "/root/airflow/logs"
             subPath:
           #- name: airflow-dags
           #  mountPath: /root/airflow/dags
           #- name: airflow-logs
           #  mountPath: /root/airflow/logs
         volumes:
         - name: airflow-dags
           persistentVolumeClaim:
             claimName: airflow-dags
         - name: airflow-logs
           persistentVolumeClaim:
             claimName: airflow-logs
         - name: airflow-configmap
           configMap:
             name: airflow-configmap
     selector:
       matchLabels:
         name: airflow
   ---
   apiVersion: v1
   kind: Service
   metadata:
     name: airflow
     namespace: airflow-example
   spec:
     type: NodePort
     ports:
       - protocol: TCP
         # Port accessible inside cluster
         port: 8080
         # Port to forward to inside the pod
         targetPort: 8080
         name: http
     selector:
       name: airflow
    </details>
   
   <details><summary>configmaps.yaml</summary>
   #  Licensed to the Apache Software Foundation (ASF) under one   *
   #  or more contributor license agreements.  See the NOTICE file *
   #  distributed with this work for additional information        *
   #  regarding copyright ownership.  The ASF licenses this file   *
   #  to you under the Apache License, Version 2.0 (the            *
   #  "License"); you may not use this file except in compliance   *
   #  with the License.  You may obtain a copy of the License at   *
   #                                                               *
   #    http://www.apache.org/licenses/LICENSE-2.0                 *
   #                                                               *
   #  Unless required by applicable law or agreed to in writing,   *
   #  software distributed under the License is distributed on an  *
   #  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
   #  KIND, either express or implied.  See the License for the    *
   #  specific language governing permissions and limitations      *
   #  under the License.                                           *
   apiVersion: v1
   kind: ConfigMap
   metadata:
     name: airflow-configmap
   data:
     airflow.cfg: |
       [logging]
       logging_level = INFO
       base_log_folder = /root/airflow/logs/
       remote_logging = False
   
       [core]
       dags_folder = /root/airflow/dags
       executor = KubernetesExecutor
       parallelism = 32
       # dag_concurrency = 32
       load_examples = False
       plugins_folder = /root/airflow/plugins
       AIRFLOW__CORE__SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
       sql_alchemy_conn = $SQL_ALCHEMY_CONN
       hostname_callable = airflow_local_settings.hostname_callable
       fernet_key = vtQgJBcmi_GXTyOlkzj9J-vQKBxTkQw5We-PJHXcoVc= # airflow-secrets=fernet_key
   
       [metrics]
       statsd_on = False
       statsd_host = localhost
       statsd_port = 8125
       statsd_prefix = airflow
   
       [scheduler]
       dag_dir_list_interval = 300
       child_process_log_directory = /root/airflow/logs/scheduler
       # Task instances listen for external kill signal (when you clear tasks
       # from the CLI or the UI), this defines the frequency at which they should
       # listen (in seconds).
       job_heartbeat_sec = 5
       parsing_processes = 2
   
       # The scheduler constantly tries to trigger new tasks (look at the
       # scheduler section in the docs for more information). This defines
       # how often the scheduler should run (in seconds).
       scheduler_heartbeat_sec = 5
       scheduler_health_check_threshold = 120
   
       # after how much time a new DAGs should be picked up from the filesystem
       min_file_process_interval = 0
   
       # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
       min_file_parsing_loop_time = 1
   
       print_stats_interval = 30
       scheduler_zombie_task_threshold = 300
       max_tis_per_query = 0
       max_num_rendered_ti_fields_per_task = 0
       authenticate = False
   
       # Turn off scheduler catchup by setting this to False.
       # Default behavior is unchanged and
       # Command Line Backfills still work, but the scheduler
       # will not do scheduler catchup if this is False,
       # however it can be set on a per DAG basis in the
       # DAG definition (catchup)
       catchup_by_default = True
   
       [webserver]
       # The base url of your website as airflow cannot guess what domain or
       # cname you are using. This is used in automated emails that
       # airflow sends to point links to the right web server
       base_url = http://airflow.monocl.dev
       rbac=True
   
       # The ip specified when starting the web server
       web_server_host = 0.0.0.0
   
       # The port on which to run the web server
       web_server_port = 8080
   
       # Paths to the SSL certificate and key for the web server. When both are
       # provided SSL will be enabled. This does not change the web server port.
       web_server_ssl_cert =
       web_server_ssl_key =
   
       # Number of seconds the webserver waits before killing gunicorn master that doesn't respond
       web_server_master_timeout = 120
   
       # Number of seconds the gunicorn webserver waits before timing out on a worker
       web_server_worker_timeout = 120
   
       # Number of workers to refresh at a time. When set to 0, worker refresh is
       # disabled. When nonzero, airflow periodically refreshes webserver workers by
       # bringing up new ones and killing old ones.
       worker_refresh_batch_size = 1
   
       # Number of seconds to wait before refreshing a batch of workers.
       worker_refresh_interval = 30
   
       # Secret key used to run your flask app
       secret_key = `[cyan]openssl rand -hex 30[/cyan]`
   
       # Number of workers to run the Gunicorn web server
       workers = 4
   
       # The worker class gunicorn should use. Choices include
       # sync (default), eventlet, gevent
       worker_class = sync
   
       # Log files for the gunicorn webserver. '-' means log to stderr.
       access_logfile = -
       error_logfile = -
   
       # Expose the configuration file in the web server
       expose_config = False
   
       # Default DAG view.  Valid values are:
       # tree, graph, duration, gantt, landing_times
       dag_default_view = tree
   
       # Default DAG orientation. Valid values are:
       # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
       dag_orientation = LR
   
       # Puts the webserver in demonstration mode; blurs the names of Operators for
       # privacy.
       demo_mode = False
   
       # The amount of time (in secs) webserver will wait for initial handshake
       # while fetching logs from other worker machine
       log_fetch_timeout_sec = 10
   
       # By default, the webserver shows paused DAGs. Flip this to hide paused
       # DAGs by default
       hide_paused_dags_by_default = False
   
       # Consistent page size across all listing views in the UI
       page_size = 100
   
       [smtp]
       # If you want airflow to send emails on retries, failure, and you want to use
       # the airflow.utils.email.send_email_smtp function, you have to configure an
       # smtp server here
       # smtp_host = smtp.gmail.com
       smtp_starttls = True
       smtp_ssl = False
       # Uncomment and set the user/pass settings if you want to use SMTP AUTH
       # smtp_user = 
       # smtp_password = 
       # smtp_port = 587
       # smtp_mail_from = 
   
       [kubernetes]
       pod_template_file = /root/airflow/dags/airflow_template.yml
       # deprecated
       airflow_configmap = airflow-configmap
       worker_container_repository = 817836881741.dkr.ecr.eu-west-1.amazonaws.com/k8s/airflow
       worker_container_tag = 2.0.0-gitlab
       worker_container_image_pull_policy = Always
       worker_service_account_name = airflow
       namespace = airflow-example
       delete_worker_pods = True
       dags_in_image = False
       git_repo = apache/airflow.git
       git_branch = master
       git_subpath = dags-airflow/
       git_user = luis
       git_password =
       git_sync_root = /git
       git_sync_path = repo
       git_dags_folder_mount_point = 
       dags_volume_claim = airflow-dags
       dags_volume_subpath =
       logs_volume_claim = airflow-logs
       logs_volume_subpath =
       dags_volume_host =
       logs_volume_host =
       in_cluster = True
       gcp_service_account_keys =
   
       # Example affinity and toleration definitions.
       affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
       tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
   
       # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
       git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
       git_sync_container_tag = v2.0.5
       git_sync_init_container_name = git-sync-clone
   
       #    AIRFLOW__KUBERNETES__AIRFLOW_CONFIGMAP = airflow-configmap
       #    AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY = 817836881741.dkr.ecr.eu-west-1.amazonaws.com/k8s/airflow
       #    AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG = 2.0.0-gitlab
       #    AIRFLOW__KUBERNETES__WORKER_CONTAINER_IMAGE_PULL_POLICY = Always
       #    AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME = airflow
       #    AIRFLOW__KUBERNETES__NAMESPACE = airflow-example
       #    AIRFLOW__KUBERNETES__DELETE_WORKER_PODS = True
       #    AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM = airflow-dags
       #    AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH =
       #    AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM = airflow-logs
       #    AIRFLOW__KUBERNETES__LOGS_VOLUME_SUBPATH = /root/airflow/logs/
       #    AIRFLOW__KUBERNETES__IN_CLUSTER = True
       # AIRFLOW__CORE__FERNET_KEY = airflow-secrets=fernet_key
   
       [kubernetes_node_selectors]
       # The Key-value pairs to be given to worker pods.
       # The worker pods will be scheduled to the nodes of the specified key-value pairs.
       # Should be supplied in the format: key = value
   
       [kubernetes_annotations]
       # The Key-value annotations pairs to be given to worker pods.
       # Should be supplied in the format: key = value
   
       [hive]
       # Default mapreduce queue for HiveOperator tasks
       default_hive_mapred_queue =
   
       [mysqld]
       explicit_defaults_for_timestamp=1
   
       [celery]
       # This section only applies if you are using the CeleryExecutor in
       # [core] section above
   
       # The app name that will be used by celery
       celery_app_name = airflow.executors.celery_executor
   
       # The concurrency that will be used when starting workers with the
       # "airflow worker" command. This defines the number of task instances that
       # a worker will take, so size up your workers based on the resources on
       # your worker box and the nature of your tasks
       worker_concurrency = 16
   
       # When you start an airflow worker, airflow starts a tiny web server
       # subprocess to serve the workers local log files to the airflow main
       # web server, who then builds pages and sends them to users. This defines
       # the port on which the logs are served. It needs to be unused, and open
       # visible from the main web server to connect into the workers.
       worker_log_server_port = 8793
   
       # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
       # a sqlalchemy database. Refer to the Celery documentation for more
       # information.
       # http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
       broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
   
       # The Celery result_backend. When a job finishes, it needs to update the
       # metadata of the job. Therefore it will post a message on a message bus,
       # or insert it into a database (depending of the backend)
       # This status is used by the scheduler to update the state of the task
       # The use of a database is highly recommended
       # http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
       result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
   
       # Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
       # it `airflow flower`. This defines the IP that Celery Flower runs on
       flower_host = 0.0.0.0
   
       # The root URL for Flower
       # Ex: flower_url_prefix = /flower
       flower_url_prefix =
   
       # This defines the port that Celery Flower runs on
       flower_port = 5555
   
       # Securing Flower with Basic Authentication
       # Accepts user:password pairs separated by a comma
       # Example: flower_basic_auth = user1:password1,user2:password2
       flower_basic_auth =
   
       # Default queue that tasks get assigned to and that worker listen on.
       default_queue = default
   
       # How many processes CeleryExecutor uses to sync task state.
       # 0 means to use max(1, number of cores - 1) processes.
       sync_parallelism = 0
   
       # Import path for celery configuration options
       celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
   
       [celery_broker_transport_options]
       # The visibility timeout defines the number of seconds to wait for the worker
       # to acknowledge the task before the message is redelivered to another worker.
       # Make sure to increase the visibility timeout to match the time of the longest
       # ETA you're planning to use. Especially important in case of using Redis or SQS
       visibility_timeout = 21600
   
       # In case of using SSL
       ssl_active = False
       ssl_key =
       ssl_cert =
       ssl_cacert =
   
       [dask]
       # This section only applies if you are using the DaskExecutor in
       # [core] section above
   
       # The IP address and port of the Dask cluster's scheduler.
       cluster_address = 127.0.0.1:8786
       # TLS/ SSL settings to access a secured Dask scheduler.
       tls_ca =
       tls_cert =
       tls_key =
   
       [ldap]
       # set this to ldaps://<your.ldap.server>:<port>
       uri =
       user_filter = objectClass=*
       user_name_attr = uid
       group_member_attr = memberOf
       superuser_filter =
       data_profiler_filter =
       bind_user = cn=Manager,dc=example,dc=com
       bind_password = insecure
       basedn = dc=example,dc=com
       cacert = /etc/ca/ldap_ca.crt
       search_scope = LEVEL
   
       [kerberos]
       ccache = /tmp/airflow_krb5_ccache
       # gets augmented with fqdn
       principal = airflow
       reinit_frequency = 3600
       kinit_path = kinit
       keytab = airflow.keytab
   
       [cli]
       api_client = airflow.api.client.json_client
       endpoint_url = http://0.0.0.0:8080
   
       [api]
       auth_backend = airflow.api.auth.backend.default
   
       [github_enterprise]
       api_rev = v3
   
       [admin]
       # UI to hide sensitive variable fields when set to True
       hide_sensitive_variable_fields = True
   
       [elasticsearch]
       elasticsearch_host =
   </details>
   <details><summary>postgres.yaml</summary>
   #  Licensed to the Apache Software Foundation (ASF) under one   *
   #  or more contributor license agreements.  See the NOTICE file *
   #  distributed with this work for additional information        *
   #  regarding copyright ownership.  The ASF licenses this file   *
   #  to you under the Apache License, Version 2.0 (the            *
   #  "License"); you may not use this file except in compliance   *
   #  with the License.  You may obtain a copy of the License at   *
   #                                                               *
   #    http://www.apache.org/licenses/LICENSE-2.0                 *
   #                                                               *
   #  Unless required by applicable law or agreed to in writing,   *
   #  software distributed under the License is distributed on an  *
   #  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
   #  KIND, either express or implied.  See the License for the    *
   #  specific language governing permissions and limitations      *
   #  under the License.                                           *
   
   kind: Deployment
   apiVersion: apps/v1
   metadata:
     name: postgres-airflow
   spec:
     replicas: 1
     selector:
       matchLabels:
         name: postgres-airflow
     template:
       metadata:
         labels:
           name: postgres-airflow
       spec:
         restartPolicy: Always
         containers:
           - name: postgres
             image: postgres
             imagePullPolicy: IfNotPresent
             ports:
               - containerPort: 5432
                 protocol: TCP
             volumeMounts:
               - name: dbvol
                 mountPath: /var/lib/postgresql/data/pgdata
                 subPath: pgdata
             env:
               - name: POSTGRES_USER
                 value: root
               - name: POSTGRES_PASSWORD
                 value: root
               - name: POSTGRES_DB
                 value: airflow
               - name: PGDATA
                 value: /var/lib/postgresql/data/pgdata
               - name: POD_IP
                 valueFrom: { fieldRef: { fieldPath: status.podIP } }
             livenessProbe:
               initialDelaySeconds: 60
               timeoutSeconds: 5
               failureThreshold: 5
               exec:
                 command:
                 - /bin/sh
                 - -c
                 - exec pg_isready --host $POD_IP ||  if [[ $(psql -qtAc --host $POD_IP 'SELECT pg_is_in_recovery') != "f" ]]; then  exit 0 else; exit 1; fi
             readinessProbe:
               initialDelaySeconds: 5
               timeoutSeconds: 5
               periodSeconds: 5
               exec:
                 command:
                 - /bin/sh
                 - -c
                 - exec pg_isready --host $POD_IP
             resources:
               requests:
                 memory: .5Gi
                 cpu: .5
         volumes:
           - name: dbvol
             emptyDir: {}
   ---
   apiVersion: v1
   kind: Service
   metadata:
     name: postgres-airflow
   spec:
     ports:
       - port: 5432
         targetPort: 5432
     selector:
       name: postgres-airflow
   </details>
   <details><summary>secrets.yaml</summary>
   #  Licensed to the Apache Software Foundation (ASF) under one   *
   #  or more contributor license agreements.  See the NOTICE file *
   #  distributed with this work for additional information        *
   #  regarding copyright ownership.  The ASF licenses this file   *
   #  to you under the Apache License, Version 2.0 (the            *
   #  "License"); you may not use this file except in compliance   *
   #  with the License.  You may obtain a copy of the License at   *
   #                                                               *
   #    http://www.apache.org/licenses/LICENSE-2.0                 *
   #                                                               *
   #  Unless required by applicable law or agreed to in writing,   *
   #  software distributed under the License is distributed on an  *
   #  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
   #  KIND, either express or implied.  See the License for the    *
   #  specific language governing permissions and limitations      *
   #  under the License.                                           *
   apiVersion: v1
   kind: Secret
   metadata:
     name: airflow-secrets
   type: Opaque
   data:
     # The sql_alchemy_conn value is a base64 encoded representation of this connection string:
     # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
     sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
   </details>
   <details><summary>volumes.yaml</summary>
   #  Licensed to the Apache Software Foundation (ASF) under one   *
   #  or more contributor license agreements.  See the NOTICE file *
   #  distributed with this work for additional information        *
   #  regarding copyright ownership.  The ASF licenses this file   *
   #  to you under the Apache License, Version 2.0 (the            *
   #  "License"); you may not use this file except in compliance   *
   #  with the License.  You may obtain a copy of the License at   *
   #                                                               *
   #    http://www.apache.org/licenses/LICENSE-2.0                 *
   #                                                               *
   #  Unless required by applicable law or agreed to in writing,   *
   #  software distributed under the License is distributed on an  *
   #  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
   #  KIND, either express or implied.  See the License for the    *
   #  specific language governing permissions and limitations      *
   #  under the License.                                           *
   
   # The backing volume can be anything you want, it just needs to be `ReadWriteOnce`
   # I'm using hostPath since minikube is nice for testing, but any (non-local) volume will work on a real cluster
   kind: PersistentVolume
   apiVersion: v1
   metadata:
     name: airflow-dags
   spec:
     accessModes:
       - ReadWriteMany
     capacity:
       storage: 2Gi
     hostPath:
       path: /airflow-dags/
   ---
   kind: PersistentVolume
   apiVersion: v1
   metadata:
     name: airflow-logs
   spec:
     hostPath:
       path: /airflow-logs/
     accessModes:
       - ReadWriteMany
     capacity:
       storage: 2Gi
   </details>
   <details><summary>deploy.sh</summary>
   #!/usr/bin/env bash
   #
   #  Licensed to the Apache Software Foundation (ASF) under one   *
   #  or more contributor license agreements.  See the NOTICE file *
   #  distributed with this work for additional information        *
   #  regarding copyright ownership.  The ASF licenses this file   *
   #  to you under the Apache License, Version 2.0 (the            *
   #  "License"); you may not use this file except in compliance   *
   #  with the License.  You may obtain a copy of the License at   *
   #                                                               *
   #    http://www.apache.org/licenses/LICENSE-2.0                 *
   #                                                               *
   #  Unless required by applicable law or agreed to in writing,   *
   #  software distributed under the License is distributed on an  *
   #  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
   #  KIND, either express or implied.  See the License for the    *
   #  specific language governing permissions and limitations      *
   #  under the License.                                           *
   
   set -x
   
   ANSIBLE_IMAGE=$ANSIBLE_IMG
   ANSIBLE_TAG=$ANSIBLE_TG
   AIRFLOW_IMAGE=$IMAGE
   AIRFLOW_TAG=$TAG
   DIRNAME=$(cd "$(dirname "$0")"; pwd)
   
   usage() {
       cat << EOF
     usage: $0 options
     OPTIONS:
       -d Use PersistentVolume or GitSync for dags_folder. Available options are "persistent_mode" or "git_mode"
   EOF
       exit 1;
   }
   
   while getopts ":d:" OPTION; do
     case ${OPTION} in
       d)
         DAGS_VOLUME=${OPTARG};;
       \?)
         echo "Invalid option: -$OPTARG" >&2
         exit 1
         ;;
       :)
         echo "Option -$OPTARG requires an argument." >&2
         usage
         ;;
     esac
   done
   
   case ${DAGS_VOLUME} in
     "persistent_mode")
       GIT_SYNC=0
       ;;
     "git_mode")
       GIT_SYNC=1
       ;;
     *)
       echo "Value \"$DAGS_VOLUME\" for dags_folder is not valid." >&2
       usage
       ;;
   esac
   
   if [ "${GIT_SYNC}" = 0 ]; then
       INIT_DAGS_VOLUME_NAME=airflow-dags
       POD_AIRFLOW_DAGS_VOLUME_NAME=airflow-dags
       CONFIGMAP_DAGS_FOLDER=/root/airflow/dags
       CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT=
       CONFIGMAP_DAGS_VOLUME_CLAIM=airflow-dags
       AIRFLOW__CORE__HOSTNAME_CALLABLE=airflow.hostname_resolver:resolve
   else
       INIT_DAGS_VOLUME_NAME=airflow-dags-fake
       POD_AIRFLOW_DAGS_VOLUME_NAME=airflow-dags-git
       CONFIGMAP_DAGS_FOLDER=/root/airflow/dags/repo/airflow/contrib/example_dags
       CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT=/root/airflow/dags
       CONFIGMAP_DAGS_VOLUME_CLAIM=
   fi
   CONFIGMAP_GIT_REPO=${GIT_REPO:-apache/airflow}
   CONFIGMAP_BRANCH=${GIT_BRANCH:-master}
   
   
   
   # Fix file permissions
   if [[ "${TRAVIS}" == true ]]; then
     sudo chown -R travis.travis $HOME/.kube $HOME/.minikube
   fi
   kubectl delete -f $DIRNAME/postgres.yaml
   kubectl delete -f $DIRNAME/airflow.yaml
   #kubectl delete -f $DIRNAME/secrets.yaml
   kubectl delete secret --ignore-not-found regcred
   
   set -e
   
   kubectl apply -f $DIRNAME/namespace.yaml
   kubectl config set-context --current --namespace=airflow-example
   kubectl apply -f $DIRNAME/secrets.yaml
   kubectl apply -f $DIRNAME/configmaps.yaml
   kubectl apply -f $DIRNAME/postgres.yaml
   sleep 10
   kubectl apply -f $DIRNAME/volumes.yaml
   kubectl apply -f $DIRNAME/airflow.yaml
   
   # wait for up to 10 minutes for everything to be deployed
   PODS_ARE_READY=0
   for i in {1..150}
   do
     echo "------- Running kubectl get pods -------"
     PODS=$(kubectl get pods | awk 'NR>1 {print $0}')
     echo "$PODS"
     NUM_AIRFLOW_READY=$(echo $PODS | grep airflow | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l | xargs)
     NUM_POSTGRES_READY=$(echo $PODS | grep postgres | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l | xargs)
     if [ "$NUM_AIRFLOW_READY" == "1" ] && [ "$NUM_POSTGRES_READY" == "1" ]; then
       PODS_ARE_READY=1
       break
     fi
     sleep 4
   done
   
   if [[ "$PODS_ARE_READY" == 1 ]]; then
     echo "PODS are ready."
   else
     echo "PODS are not ready after waiting for a long time. Exiting..."
     exit 1
   fi
   </details>
   
   <details><summary>run_this_one.sh</summary>
   export IMAGE=<your CR/image here>
   export TAG=<your tag here>
   
   ./deploy_airflow.sh -d persistent_mode
   </details>
   
   I already provided the DAG and the image file before.
   
   
   **Anything else we need to know**:
   It occurs every time with the same output even when recreated with minikube.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #15305: KubernetesExecutor Airflow 2.0.0 different AirflowExceptions and tasks not getting scheduled

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #15305:
URL: https://github.com/apache/airflow/issues/15305#issuecomment-816720545


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lfreina edited a comment on issue #15305: KubernetesExecutor Airflow 2.0.0 different AirflowExceptions and tasks not getting scheduled

Posted by GitBox <gi...@apache.org>.
lfreina edited a comment on issue #15305:
URL: https://github.com/apache/airflow/issues/15305#issuecomment-849070595


   I have now updated my instance to Airflow 2.0.2 and this bug is now gone. A new one is present though. Thanks for the fix! :+1: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil closed issue #15305: KubernetesExecutor Airflow 2.0.0 different AirflowExceptions and tasks not getting scheduled

Posted by GitBox <gi...@apache.org>.
kaxil closed issue #15305:
URL: https://github.com/apache/airflow/issues/15305


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on issue #15305: KubernetesExecutor Airflow 2.0.0 different AirflowExceptions and tasks not getting scheduled

Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #15305:
URL: https://github.com/apache/airflow/issues/15305#issuecomment-822823060


   Fixed by https://github.com/apache/airflow/pull/14160 and released in Airflow 2.0.2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lfreina commented on issue #15305: KubernetesExecutor Airflow 2.0.0 different AirflowExceptions and tasks not getting scheduled

Posted by GitBox <gi...@apache.org>.
lfreina commented on issue #15305:
URL: https://github.com/apache/airflow/issues/15305#issuecomment-849070595


   I have now updated my instance to Airflow 2.0.2 and this bug is still present. So I would like to reopen this issue. :+1: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on issue #15305: KubernetesExecutor Airflow 2.0.0 different AirflowExceptions and tasks not getting scheduled

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on issue #15305:
URL: https://github.com/apache/airflow/issues/15305#issuecomment-822823060


   Fixed by https://github.com/apache/airflow/pull/14160 and released in Airflow 2.0.2 (https://pypi.org/project/apache-airflow/2.0.2/)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org