You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/28 16:50:07 UTC

[GitHub] [airflow] RenGeng opened a new issue #21190: Airflow launched two different job in one pod

RenGeng opened a new issue #21190:
URL: https://github.com/apache/airflow/issues/21190


   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==6.0.0
   
   ### Apache Airflow version
   
   2.2.1
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   We deploy airflow on GKE with Kubernetes executor, here's GKE version: 
   `Server Version: version.Info{Major:"1", Minor:"19+", GitVersion:"v1.19.15-gke.1801", GitCommit:"12532d9bc117118634f870e6083a51837b34a3e8", GitTreeState:"clean", BuildDate:"2021-10-21T21:27:39Z", GoVersion:"go1.15.15b5", Compiler:"gc", Platform:"linux/amd64"}`
   
   ### What happened
   
   I use SFTPToGCSOperator with the parameter `move_objectbool=True`, so the source file will be deleted in the ftp server, but sometimes I saw two different jobs launched at the same time in one pod, so one of them will delete de file and the other will fail because the file doesn't exist.
   
   ### What you expected to happen
   
   Only launch one job in the pod.
   
   ### How to reproduce
   
   Use SFTPToGCSOperator, but this problem is not recurrent.
   
   ### Anything else
   
   
   ```
   --------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1242} INFO - Starting attempt 1 of 4
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1243} INFO - 
   --------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1262} INFO - Executing <Task(SFTPToGCSOperator): move_task> on 2022-01-26 03:00:00+00:00
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:52} INFO - Started process 55 to run task
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'move_dag', 'move_task', 'scheduled__2022-01-26T03:00:00+00:00', '--job-id', '735927', '--raw', '--subdir', 'DAGS_FOLDER/move_dag.py', '--cfg-path', '/tmp/tmp1pjt_ykp', '--error-file', '/tmp/tmp2lkxj9im']
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:77} INFO - Job 735927: Subtask move_task
   [2022-01-27, 03:09:32 UTC] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [<function default_action_log at 0x7fde5b957f70>]
   [2022-01-27, 03:09:33 UTC] {settings.py:210} DEBUG - Setting up DB connection pool (PID 55)
   [2022-01-27, 03:09:33 UTC] {settings.py:267} DEBUG - settings.prepare_engine_args(): Using NullPool
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> on host movedag.39daa8eb5a
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:1427} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=move_dag
   AIRFLOW_CTX_TASK_ID=move_task
   AIRFLOW_CTX_EXECUTION_DATE=2022-01-26T03:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-01-26T03:00:00+00:00
   [2022-01-27, 03:09:33 UTC] {__init__.py:146} DEBUG - Preparing lineage inlets and outlets
   [2022-01-27, 03:09:33 UTC] {__init__.py:190} DEBUG - inlets: [], outlets: []
   [2022-01-27, 03:09:33 UTC] {base.py:70} INFO - Using connection to: id: ftp_dl_talend. Host: 10.108.1.45, Port: 22, Schema: , Login: talend, Password: None, extra: {'key_file': '/home/airflow/.ssh/id_rsa', 'no_host_key_check': True}
   [2022-01-27, 03:09:33 UTC] {base.py:70} INFO - Using connection to: id: ftp_dl_talend. Host: 10.108.1.45, Port: 22, Schema: , Login: talend, Password: None, extra: {'key_file': '/home/airflow/.ssh/id_rsa', 'no_host_key_check': True}
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py:61 UserWarning: Failed to load HostKeys from /home/airflow/.ssh/known_hosts.  You will need to explicitly load HostKeys (cnopts.hostkeys.load(filename)) or disableHostKey checking (cnopts.hostkeys = None).
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - starting thread (client mode): 0x3dab54f0
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Local version/idstring: SSH-2.0-paramiko_2.8.0
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Remote version/idstring: SSH-2.0-OpenSSH_7.4
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} INFO - Connected (version 2.0, client OpenSSH_7.4)
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - kex algos:['curve25519-sha256', 'curve25519-sha256@libssh.org', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521', 'diffie-hellman-group-exchange-sha256', 'diffie-hellman-group16-sha512', 'diffie-hellman-group18-sha512', 'diffie-hellman-group-exchange-sha1', 'diffie-hellman-group14-sha256', 'diffie-hellman-group14-sha1', 'diffie-hellman-group1-sha1'] server key:['ssh-rsa', 'rsa-sha2-512', 'rsa-sha2-256', 'ecdsa-sha2-nistp256', 'ssh-ed25519'] client encrypt:['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc', 'blowfish-cbc', 'cast128-cbc', '3des-cbc'] server encrypt:['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc', 'blowfish-cbc', 'cast128-cbc', '3des-cbc'] client mac:['umac-64-et
 m@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] server mac:['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] client compress:['none', 'zlib@openssh.com'] server compress:['none', 'zlib@openssh.com'] client lang:[''] server lang:[''] kex follows?False
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Kex agreed: curve25519-sha256@libssh.org
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - HostKey agreed: ssh-ed25519
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Cipher agreed: aes128-ctr
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - MAC agreed: hmac-sha2-256
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Compression agreed: zlib@openssh.com
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - kex engine KexCurve25519 specified hash_algo <built-in function openssl_sha256>
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switch to new keys ...
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Attempting public-key auth...
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - userauth is OK
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} INFO - Authentication (publickey) successful!
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switching on outbound compression ...
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switching on inbound compression ...
   [2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Max packet in: 32768 bytes
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Received global request "hostkeys-00@openssh.com"
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Rejecting "hostkeys-00@openssh.com" global request from server.
   [2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Max packet out: 32768 bytes
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Secsh channel 0 opened.
   [2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Sesch channel 0 request ok
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} INFO - [chan 0] Opened sftp connection (server version 3)
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] listdir(b'input/PROD/polo/BKG_OFFER')
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20210630.csv')
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20210701.csv')
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20210702.csv')
   .....
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20220117.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:175} INFO - Executing copy of input_20220127.csv to gs://XXXXXX/input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] open(b'input_20220127.csv 'rb')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] open(b'input_20220127, 'rb') 000000
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] close(00000000)
   [2022-01-27, 03:09:34 UTC] {credentials_provider.py:295} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
   [2022-01-27, 03:09:34 UTC] {_default.py:206} DEBUG - Checking None for explicit credentials as part of auth process...
   [2022-01-27, 03:09:34 UTC] {_default.py:181} DEBUG - Checking Cloud SDK credentials as part of auth process...
   [2022-01-27, 03:09:34 UTC] {_default.py:187} DEBUG - Cloud SDK credentials not found on disk; not using them
   [2022-01-27, 03:09:34 UTC] {_http_client.py:104} DEBUG - Making request: GET http://169.254.169.254
   [2022-01-27, 03:09:34 UTC] {_http_client.py:104} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/project/project-id
   [2022-01-27, 03:09:34 UTC] {requests.py:182} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true
   [2022-01-27, 03:09:34 UTC] {requests.py:182} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/<redacted>
   [2022-01-27, 03:09:34 UTC] {gcs.py:485} INFO - File /tmp/tmpv52p77bg uploaded to input_20220127.csv in XXXX bucket
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] remove(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} INFO - [chan 0] sftp session closed.
   [2022-01-27, 03:09:34 UTC] {channel.py:1212} DEBUG - [chan 0] EOF sent (0)
   [2022-01-27, 03:09:34 UTC] {__init__.py:107} DEBUG - Lineage called with inlets: [], outlets: []
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:34 UTC] {transport.py:1819} DEBUG - EOF in transport thread
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:1286} DEBUG - Clearing next_method and next_kwargs.
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:1270} INFO - Marking task as SUCCESS. dag_id=move_dag, task_id=move_task, execution_date=20220126T030000, start_date=20220127T030932, end_date=20220127T030935
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:2224} DEBUG - Task Duration set to 3.168044
   [2022-01-27, 03:09:35 UTC] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
   [2022-01-27, 03:09:35 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
   [2022-01-27, 03:09:36 UTC] {dagrun.py:588} DEBUG - number of tis tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00: scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {dagrun.py:603} DEBUG - number of scheduleable tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00: scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Not In Retry Period' PASSED: False, Task is not ready for retry yet but will be retried automatically. Current date is 2022-01-27T03:09:36.126713+00:00 and task will be retried at 2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for retry yet but will be retried automatically. Current date is 2022-01-27T03:09:36.126713+00:00 and task will be retried at 2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
   
   [2022-01-27, 03:09:31 UTC] {base_task_runner.py:63} DEBUG - Planning to run as the  user
   [2022-01-27, 03:09:31 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> from DB
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]>
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in running state.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]>
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, ('There are enough open slots in %s to execute the task', 'default_pool')
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]>
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1241} INFO - 
   --------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1242} INFO - Starting attempt 1 of 4
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1243} INFO - 
   --------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1262} INFO - Executing <Task(SFTPToGCSOperator): move_task> on 2022-01-26 03:00:00+00:00
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:52} INFO - Started process 55 to run task
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'move_dag', 'move_task', 'scheduled__2022-01-26T03:00:00+00:00', '--job-id', '735923', '--raw', '--subdir', 'DAGS_FOLDER/move_dag.py', '--cfg-path', '/tmp/tmp_kvghyhi', '--error-file', '/tmp/tmpxlr36n7u']
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:77} INFO - Job 735923: Subtask move_task
   [2022-01-27, 03:09:32 UTC] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [<function default_action_log at 0x7fc797444f70>]
   [2022-01-27, 03:09:32 UTC] {settings.py:210} DEBUG - Setting up DB connection pool (PID 55)
   [2022-01-27, 03:09:32 UTC] {settings.py:267} DEBUG - settings.prepare_engine_args(): Using NullPool
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> on host movedag.68e76697f9
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:1427} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=move_dag
   AIRFLOW_CTX_TASK_ID=move_task
   AIRFLOW_CTX_EXECUTION_DATE=2022-01-26T03:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-01-26T03:00:00+00:00
   .......
   SAME AS BEFORE
   .......
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] remove(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1703} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
       result = execute_callable(context=context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 144, in execute
       self._copy_single_object(
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 194, in _copy_single_object
       sftp_hook.delete_file(source_path)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/sftp/hooks/sftp.py", line 251, in delete_file
       conn.remove(path)
     File "/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py", line 728, in remove
       self._sftp.remove(remotefile)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 398, in remove
       self._request(CMD_REMOVE, path)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 822, in _request
       return self._read_response(num)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 874, in _read_response
       self._convert_status(msg)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 903, in _convert_status
       raise IOError(errno.ENOENT, text)
   FileNotFoundError: [Errno 2] No such file
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:2224} DEBUG - Task Duration set to 1.63822
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1286} DEBUG - Clearing next_method and next_kwargs.
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1270} INFO - Marking task as UP_FOR_RETRY. dag_id=move_dag, task_id=move_task, execution_date=20220126T030000, start_date=20220127T030932, end_date=20220127T030934
   [2022-01-27, 03:09:35 UTC] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
   [2022-01-27, 03:09:35 UTC] {standard_task_runner.py:88} ERROR - Failed to execute job 735923 for task move_task
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
       args.func(args, dag=self.dag)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
       _run_raw_task(args, ti)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
       ti._run_raw_task(
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
       result = execute_callable(context=context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 144, in execute
       self._copy_single_object(
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 194, in _copy_single_object
       sftp_hook.delete_file(source_path)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/sftp/hooks/sftp.py", line 251, in delete_file
       conn.remove(path)
     File "/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py", line 728, in remove
       self._sftp.remove(remotefile)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 398, in remove
       self._request(CMD_REMOVE, path)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 822, in _request
       return self._read_response(num)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 874, in _read_response
       self._convert_status(msg)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 903, in _convert_status
       raise IOError(errno.ENOENT, text)
   FileNotFoundError: [Errno 2] No such file
   [2022-01-27, 03:09:35 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
   [2022-01-27, 03:09:36 UTC] {dagrun.py:588} DEBUG - number of tis tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00: scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {dagrun.py:603} DEBUG - number of scheduleable tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00: scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Not In Retry Period' PASSED: False, Task is not ready for retry yet but will be retried automatically. Current date is 2022-01-27T03:09:36.252994+00:00 and task will be retried at 2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for retry yet but will be retried automatically. Current date is 2022-01-27T03:09:36.252994+00:00 and task will be retried at 2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
   
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #21190: Airflow launched two different job in one pod

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #21190:
URL: https://github.com/apache/airflow/issues/21190#issuecomment-1024412707


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #21190: Airflow launched two different job in one pod

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #21190:
URL: https://github.com/apache/airflow/issues/21190#issuecomment-1030831008


   Could you please explain where do you see two jobs in one pod? I cannot see it from the log. I think there is some misunderstanding here. Possibly you are mixing "log" with POD (I guess that) but further information is needed:
   * why do you think it was one job in pod
   * what did you expect to happem
   * what is wrong with the log you provided
   
   Please be as precise as possible. I will convert it into discussion until we know whether it is really an issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org