You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/07 09:37:34 UTC

[GitHub] [airflow] potiuk opened a new issue #21375: Airflow launched two identical tasks in parallel with K8S executor

potiuk opened a new issue #21375:
URL: https://github.com/apache/airflow/issues/21375


   ### Discussed in https://github.com/apache/airflow/discussions/21360
   
   <div type='discussions-op-text'>
   
   <sup>Originally posted by **RenGeng** January 28, 2022</sup>
   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==6.0.0
   
   ### Apache Airflow version
   
   2.2.1
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   We deploy airflow on GKE with Kubernetes executor, here's GKE version: 
   `Server Version: version.Info{Major:"1", Minor:"19+", GitVersion:"v1.19.15-gke.1801", GitCommit:"12532d9bc117118634f870e6083a51837b34a3e8", GitTreeState:"clean", BuildDate:"2021-10-21T21:27:39Z", GoVersion:"go1.15.15b5", Compiler:"gc", Platform:"linux/amd64"}`
   
   ### What happened
   
   I use SFTPToGCSOperator with the parameter `move_objectbool=True`, so the source file will be deleted in the ftp server, but sometimes I saw two different jobs launched at the same time in one pod, so one of them will delete de file and the other will fail because the file doesn't exist.
   
   ### What you expected to happen
   
   Only launch one job in the pod.
   
   ### How to reproduce
   
   Use SFTPToGCSOperator, but this problem is not recurrent.
   
   ### Anything else
   
   
   ```
   --------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1242} INFO - Starting attempt 1 of 4
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1243} INFO - 
   --------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1262} INFO - Executing <Task(SFTPToGCSOperator): move_task> on 2022-01-26 03:00:00+00:00
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:52} INFO - Started process 55 to run task
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'move_dag', 'move_task', 'scheduled__2022-01-26T03:00:00+00:00', '--job-id', '735927', '--raw', '--subdir', 'DAGS_FOLDER/move_dag.py', '--cfg-path', '/tmp/tmp1pjt_ykp', '--error-file', '/tmp/tmp2lkxj9im']
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:77} INFO - Job 735927: Subtask move_task
   [2022-01-27, 03:09:32 UTC] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [<function default_action_log at 0x7fde5b957f70>]
   [2022-01-27, 03:09:33 UTC] {settings.py:210} DEBUG - Setting up DB connection pool (PID 55)
   [2022-01-27, 03:09:33 UTC] {settings.py:267} DEBUG - settings.prepare_engine_args(): Using NullPool
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> on host movedag.39daa8eb5a
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:1427} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=move_dag
   AIRFLOW_CTX_TASK_ID=move_task
   AIRFLOW_CTX_EXECUTION_DATE=2022-01-26T03:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-01-26T03:00:00+00:00
   [2022-01-27, 03:09:33 UTC] {__init__.py:146} DEBUG - Preparing lineage inlets and outlets
   [2022-01-27, 03:09:33 UTC] {__init__.py:190} DEBUG - inlets: [], outlets: []
   [2022-01-27, 03:09:33 UTC] {base.py:70} INFO - Using connection to: id: ftp_dl_talend. Host: 10.108.1.45, Port: 22, Schema: , Login: talend, Password: None, extra: {'key_file': '/home/airflow/.ssh/id_rsa', 'no_host_key_check': True}
   [2022-01-27, 03:09:33 UTC] {base.py:70} INFO - Using connection to: id: ftp_dl_talend. Host: 10.108.1.45, Port: 22, Schema: , Login: talend, Password: None, extra: {'key_file': '/home/airflow/.ssh/id_rsa', 'no_host_key_check': True}
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py:61 UserWarning: Failed to load HostKeys from /home/airflow/.ssh/known_hosts.  You will need to explicitly load HostKeys (cnopts.hostkeys.load(filename)) or disableHostKey checking (cnopts.hostkeys = None).
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - starting thread (client mode): 0x3dab54f0
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Local version/idstring: SSH-2.0-paramiko_2.8.0
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Remote version/idstring: SSH-2.0-OpenSSH_7.4
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} INFO - Connected (version 2.0, client OpenSSH_7.4)
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - kex algos:['curve25519-sha256', 'curve25519-sha256@libssh.org', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521', 'diffie-hellman-group-exchange-sha256', 'diffie-hellman-group16-sha512', 'diffie-hellman-group18-sha512', 'diffie-hellman-group-exchange-sha1', 'diffie-hellman-group14-sha256', 'diffie-hellman-group14-sha1', 'diffie-hellman-group1-sha1'] server key:['ssh-rsa', 'rsa-sha2-512', 'rsa-sha2-256', 'ecdsa-sha2-nistp256', 'ssh-ed25519'] client encrypt:['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc', 'blowfish-cbc', 'cast128-cbc', '3des-cbc'] server encrypt:['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc', 'blowfish-cbc', 'cast128-cbc', '3des-cbc'] client mac:['umac-64-et
 m@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] server mac:['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] client compress:['none', 'zlib@openssh.com'] server compress:['none', 'zlib@openssh.com'] client lang:[''] server lang:[''] kex follows?False
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Kex agreed: curve25519-sha256@libssh.org
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - HostKey agreed: ssh-ed25519
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Cipher agreed: aes128-ctr
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - MAC agreed: hmac-sha2-256
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Compression agreed: zlib@openssh.com
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - kex engine KexCurve25519 specified hash_algo <built-in function openssl_sha256>
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switch to new keys ...
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Attempting public-key auth...
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - userauth is OK
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} INFO - Authentication (publickey) successful!
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switching on outbound compression ...
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switching on inbound compression ...
   [2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Max packet in: 32768 bytes
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Received global request "hostkeys-00@openssh.com"
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Rejecting "hostkeys-00@openssh.com" global request from server.
   [2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Max packet out: 32768 bytes
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Secsh channel 0 opened.
   [2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Sesch channel 0 request ok
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} INFO - [chan 0] Opened sftp connection (server version 3)
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] listdir(b'input/PROD/polo/BKG_OFFER')
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20210630.csv')
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20210701.csv')
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20210702.csv')
   .....
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20220117.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:175} INFO - Executing copy of input_20220127.csv to gs://XXXXXX/input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] stat(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] open(b'input_20220127.csv 'rb')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] open(b'input_20220127, 'rb') 000000
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] close(00000000)
   [2022-01-27, 03:09:34 UTC] {credentials_provider.py:295} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
   [2022-01-27, 03:09:34 UTC] {_default.py:206} DEBUG - Checking None for explicit credentials as part of auth process...
   [2022-01-27, 03:09:34 UTC] {_default.py:181} DEBUG - Checking Cloud SDK credentials as part of auth process...
   [2022-01-27, 03:09:34 UTC] {_default.py:187} DEBUG - Cloud SDK credentials not found on disk; not using them
   [2022-01-27, 03:09:34 UTC] {_http_client.py:104} DEBUG - Making request: GET http://169.254.169.254
   [2022-01-27, 03:09:34 UTC] {_http_client.py:104} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/project/project-id
   [2022-01-27, 03:09:34 UTC] {requests.py:182} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true
   [2022-01-27, 03:09:34 UTC] {requests.py:182} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/<redacted>
   [2022-01-27, 03:09:34 UTC] {gcs.py:485} INFO - File /tmp/tmpv52p77bg uploaded to input_20220127.csv in XXXX bucket
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] remove(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} INFO - [chan 0] sftp session closed.
   [2022-01-27, 03:09:34 UTC] {channel.py:1212} DEBUG - [chan 0] EOF sent (0)
   [2022-01-27, 03:09:34 UTC] {__init__.py:107} DEBUG - Lineage called with inlets: [], outlets: []
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:34 UTC] {transport.py:1819} DEBUG - EOF in transport thread
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:1286} DEBUG - Clearing next_method and next_kwargs.
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:1270} INFO - Marking task as SUCCESS. dag_id=move_dag, task_id=move_task, execution_date=20220126T030000, start_date=20220127T030932, end_date=20220127T030935
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:2224} DEBUG - Task Duration set to 3.168044
   [2022-01-27, 03:09:35 UTC] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
   [2022-01-27, 03:09:35 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
   [2022-01-27, 03:09:36 UTC] {dagrun.py:588} DEBUG - number of tis tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00: scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {dagrun.py:603} DEBUG - number of scheduleable tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00: scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Not In Retry Period' PASSED: False, Task is not ready for retry yet but will be retried automatically. Current date is 2022-01-27T03:09:36.126713+00:00 and task will be retried at 2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for retry yet but will be retried automatically. Current date is 2022-01-27T03:09:36.126713+00:00 and task will be retried at 2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
   
   [2022-01-27, 03:09:31 UTC] {base_task_runner.py:63} DEBUG - Planning to run as the  user
   [2022-01-27, 03:09:31 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> from DB
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]>
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in running state.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]>
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, ('There are enough open slots in %s to execute the task', 'default_pool')
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]>
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1241} INFO - 
   --------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1242} INFO - Starting attempt 1 of 4
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1243} INFO - 
   --------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1262} INFO - Executing <Task(SFTPToGCSOperator): move_task> on 2022-01-26 03:00:00+00:00
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:52} INFO - Started process 55 to run task
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'move_dag', 'move_task', 'scheduled__2022-01-26T03:00:00+00:00', '--job-id', '735923', '--raw', '--subdir', 'DAGS_FOLDER/move_dag.py', '--cfg-path', '/tmp/tmp_kvghyhi', '--error-file', '/tmp/tmpxlr36n7u']
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:77} INFO - Job 735923: Subtask move_task
   [2022-01-27, 03:09:32 UTC] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [<function default_action_log at 0x7fc797444f70>]
   [2022-01-27, 03:09:32 UTC] {settings.py:210} DEBUG - Setting up DB connection pool (PID 55)
   [2022-01-27, 03:09:32 UTC] {settings.py:267} DEBUG - settings.prepare_engine_args(): Using NullPool
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> on host movedag.68e76697f9
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1941 DeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:1427} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=move_dag
   AIRFLOW_CTX_TASK_ID=move_task
   AIRFLOW_CTX_EXECUTION_DATE=2022-01-26T03:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-01-26T03:00:00+00:00
   .......
   SAME AS BEFORE
   .......
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] remove(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1703} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
       result = execute_callable(context=context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 144, in execute
       self._copy_single_object(
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 194, in _copy_single_object
       sftp_hook.delete_file(source_path)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/sftp/hooks/sftp.py", line 251, in delete_file
       conn.remove(path)
     File "/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py", line 728, in remove
       self._sftp.remove(remotefile)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 398, in remove
       self._request(CMD_REMOVE, path)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 822, in _request
       return self._read_response(num)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 874, in _read_response
       self._convert_status(msg)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 903, in _convert_status
       raise IOError(errno.ENOENT, text)
   FileNotFoundError: [Errno 2] No such file
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:2224} DEBUG - Task Duration set to 1.63822
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1286} DEBUG - Clearing next_method and next_kwargs.
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1270} INFO - Marking task as UP_FOR_RETRY. dag_id=move_dag, task_id=move_task, execution_date=20220126T030000, start_date=20220127T030932, end_date=20220127T030934
   [2022-01-27, 03:09:35 UTC] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
   [2022-01-27, 03:09:35 UTC] {standard_task_runner.py:88} ERROR - Failed to execute job 735923 for task move_task
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
       args.func(args, dag=self.dag)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
       _run_raw_task(args, ti)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
       ti._run_raw_task(
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
       result = execute_callable(context=context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 144, in execute
       self._copy_single_object(
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 194, in _copy_single_object
       sftp_hook.delete_file(source_path)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/sftp/hooks/sftp.py", line 251, in delete_file
       conn.remove(path)
     File "/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py", line 728, in remove
       self._sftp.remove(remotefile)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 398, in remove
       self._request(CMD_REMOVE, path)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 822, in _request
       return self._read_response(num)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 874, in _read_response
       self._convert_status(msg)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 903, in _convert_status
       raise IOError(errno.ENOENT, text)
   FileNotFoundError: [Errno 2] No such file
   [2022-01-27, 03:09:35 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
   [2022-01-27, 03:09:36 UTC] {dagrun.py:588} DEBUG - number of tis tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00: scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {dagrun.py:603} DEBUG - number of scheduleable tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00: scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> dependency 'Not In Retry Period' PASSED: False, Task is not ready for retry yet but will be retried automatically. Current date is 2022-01-27T03:09:36.252994+00:00 and task will be retried at 2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for retry yet but will be retried automatically. Current date is 2022-01-27T03:09:36.252994+00:00 and task will be retried at 2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
   
   ```
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   </div>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #21375: Airflow launched two identical tasks in parallel with K8S executor

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #21375:
URL: https://github.com/apache/airflow/issues/21375#issuecomment-1031572006


   Likely fixed in #https://github.com/apache/airflow/pull/19904  - please re-open if you test it in 2.2.3 and still there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on issue #21375: Airflow launched two identical tasks in parallel with K8S executor

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on issue #21375:
URL: https://github.com/apache/airflow/issues/21375#issuecomment-1031259843


   Seems like in some circumstances, Kubernetes Executor might run the same task as two separate PODs running in parallel:
   
   From the discussion #21360
   
   I'm sure it's not the same job, this is the log where it's succeeded to delete the file in the ftp server:
   ```
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] remove(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} INFO - [chan 0] sftp session closed.
   [2022-01-27, 03:09:34 UTC] {channel.py:1212} DEBUG - [chan 0] EOF sent (0)
   ```
   
   and this is the log from the second job:
   ```
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] remove(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1703} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
       result = execute_callable(context=context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 144, in execute
       self._copy_single_object(
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 194, in _copy_single_object
       sftp_hook.delete_file(source_path)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/sftp/hooks/sftp.py", line 251, in delete_file
       conn.remove(path)
     File "/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py", line 728, in remove
       self._sftp.remove(remotefile)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 398, in remove
       self._request(CMD_REMOVE, path)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 822, in _request
       return self._read_response(num)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 874, in _read_response
       self._convert_status(msg)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 903, in _convert_status
       raise IOError(errno.ENOENT, text)
   FileNotFoundError: [Errno 2] No such file
   ```
   
   And those lines came from the same log.
   We run this task everyday at 3AM and it's failed time to time (I would say 3 to 4 times in one month)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #21375: Airflow launched two identical tasks in parallel with K8S executor

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #21375:
URL: https://github.com/apache/airflow/issues/21375#issuecomment-1031259843


   Seems like in some circumstances, Kubernetes Executor might run the same task as two separate PODs running in parallel:
   
   I'm sure it's not the same job, this is the log where it's succeeded to delete the file in the ftp server:
   ```
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] remove(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} INFO - [chan 0] sftp session closed.
   [2022-01-27, 03:09:34 UTC] {channel.py:1212} DEBUG - [chan 0] EOF sent (0)
   ```
   
   and this is the log from the second job:
   ```
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] remove(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1703} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
       result = execute_callable(context=context)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 144, in execute
       self._copy_single_object(
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py", line 194, in _copy_single_object
       sftp_hook.delete_file(source_path)
     File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/sftp/hooks/sftp.py", line 251, in delete_file
       conn.remove(path)
     File "/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py", line 728, in remove
       self._sftp.remove(remotefile)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 398, in remove
       self._request(CMD_REMOVE, path)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 822, in _request
       return self._read_response(num)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 874, in _read_response
       self._convert_status(msg)
     File "/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 903, in _convert_status
       raise IOError(errno.ENOENT, text)
   FileNotFoundError: [Errno 2] No such file
   ```
   
   And those lines came from the same log.
   We run this task everyday at 3AM and it's failed time to time (I would say 3 to 4 times in one month)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed issue #21375: Airflow launched two identical tasks in parallel with K8S executor

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #21375:
URL: https://github.com/apache/airflow/issues/21375


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org