You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/07 04:02:32 UTC
[GitHub] [airflow] david30907d opened a new issue #16847: Marking task as UP_FOR_RETRY but didn't retry
david30907d opened a new issue #16847:
URL: https://github.com/apache/airflow/issues/16847
<!--
Welcome to Apache Airflow! For a smooth issue process, try to answer the following questions.
Don't worry if they're not all applicable; just try to include what you can :-)
If you need to include code snippets or logs, please put them in fenced code
blocks. If they're super-long, please use the details tag like
<details><summary>super-long log</summary> lots of stuff </details>
Please delete these comment blocks before submitting the issue.
-->
<!--
IMPORTANT!!!
PLEASE CHECK "SIMILAR TO X EXISTING ISSUES" OPTION IF VISIBLE
NEXT TO "SUBMIT NEW ISSUE" BUTTON!!!
PLEASE CHECK IF THIS ISSUE HAS BEEN REPORTED PREVIOUSLY USING SEARCH!!!
Please complete the next sections or the issue will be closed.
These questions are the first thing we need to know to understand the context.
-->
**Apache Airflow version**: `2.1.1rc1`
**Kubernetes version (if you are using kubernetes)** (use `kubectl version`): 1.18
**Environment**:
- **Cloud provider or hardware configuration**: AWS EKS
- **OS** (e.g. from /etc/os-release): `Debian GNU/Linux 10`
- **Kernel** (e.g. `uname -a`): `Linux airflow-worker-1 4.14.232-176.381.amzn2.x86_64 #1 SMP Wed May 19 00:31:54 UTC 2021 x86_64 GNU/Linux`
- **Install tools**: <https://github.com/airflow-helm/charts>
- **Others**:
**What happened**:
It was marked as `failed` actually, though log said it's `up for retry`.
```console
[2021-07-06 22:37:25,624] {local_task_job.py:76} ERROR - Received SIGTERM. Terminating subprocesses
[2021-07-06 22:37:25,626] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 2884
[2021-07-06 22:37:25,626] {taskinstance.py:1284} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-07-06 22:37:25,653] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/repo/dags/v1/sync/sync_segments/udfs/sync_segments.py", line 60, in main
customers = controllers.get_customers_metrics_v2(
File "/opt/airflow/dags/repo/dags/v1/utils/api/controllers.py", line 298, in get_customers_metrics_v2
res = q.all()
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3373, in all
return list(self)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
return self._execute_and_instances(context)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
ret = self._execute_context(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
self._handle_dbapi_exception(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1514, in _handle_dbapi_exception
util.raise_(exc_info[1], with_traceback=exc_info[2])
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/cursor.py", line 638, in execute
ret = self._execute_helper(query, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/cursor.py", line 456, in _execute_helper
ret = self._connection.cmd_query(
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/connection.py", line 945, in cmd_query
ret = self.rest.request(
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/network.py", line 381, in request
return self._post_request(
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/network.py", line 629, in _post_request
ret = self.fetch(
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/network.py", line 719, in fetch
ret = self._request_exec_wrapper(
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/network.py", line 841, in _request_exec_wrapper
raise e
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/network.py", line 762, in _request_exec_wrapper
return_object = self._request_exec(
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/network.py", line 1049, in _request_exec
raise err
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/network.py", line 926, in _request_exec
raw_ret = session.request(
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/requests/adapters.py", line 439, in send
resp = conn.urlopen(
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/urllib3/connectionpool.py", line 699, in urlopen
httplib_response = self._make_request(
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/urllib3/connectionpool.py", line 445, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/urllib3/connectionpool.py", line 440, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.8/http/client.py", line 1344, in getresponse
response.begin()
File "/usr/local/lib/python3.8/http/client.py", line 307, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.8/http/client.py", line 268, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.8/socket.py", line 669, in readinto
return self._sock.recv_into(b)
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/urllib3/contrib/pyopenssl.py", line 331, in recv_into
if not util.wait_for_read(self.socket, self.socket.gettimeout()):
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/urllib3/util/wait.py", line 146, in wait_for_read
return wait_for_socket(sock, read=True, timeout=timeout)
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/urllib3/util/wait.py", line 107, in poll_wait_for_socket
return bool(_retry_on_intr(do_poll, timeout))
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/urllib3/util/wait.py", line 43, in _retry_on_intr
return fn(timeout)
File "/home/airflow/.local/lib/python3.8/site-packages/snowflake/connector/vendored/urllib3/util/wait.py", line 105, in do_poll
return poll_obj.poll(t)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1286, in signal_handler
raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-07-06 22:37:25,658] {taskinstance.py:1544} INFO - Marking task as UP_FOR_RETRY. dag_id=UPDATE_INTERNAL_DAILY_EUROPEZURICH, task_id=SYNC_SEGMENTS_CORE_TASK_1893, execution_date=20210705T220000, start_date=20210706T222237, end_date=20210706T223725
[2021-07-06 22:37:25,718] {process_utils.py:66} INFO - Process psutil.Process(pid=2884, status='terminated', exitcode=1, started='22:22:36') (2884) terminated with exit code 1
```
<!-- (please include exact error messages if you can) -->
**What you expected to happen**:
It should retry, or at least trigger `on_failure_callback` to let me know it failed
<!-- What do you think went wrong? -->
**How to reproduce it**:
<!---
As minimally and precisely as possible. Keep in mind we do not have access to your cluster or dags.
If you are using kubernetes, please attempt to recreate the issue using minikube or kind.
## Install minikube/kind
- Minikube https://minikube.sigs.k8s.io/docs/start/
- Kind https://kind.sigs.k8s.io/docs/user/quick-start/
If this is a UI bug, please provide a screenshot of the bug or a link to a youtube video of the bug in action
You can include images using the .md style of
![alt text](http://url/to/img.png)
To record a screencast, mac users can use QuickTime and then create an unlisted youtube video with the resulting .mov file.
--->
1. Delete a running worker pod then you'll see: `kubectl delete pods -n airflow airflow-worker-3 --force`
2. or you can use `AWS spotinstance` or `GCP preemptible instance`, k8s's node would be reclaimed by cloud service provider from time to time and result in this issue
**Anything else we need to know**:
<!--
How often does this problem occur? Once? Every time etc?
Any relevant logs to include? Put them here in side a detail tag:
<details><summary>x.log</summary> lots of stuff </details>
-->
everytime
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] david30907d commented on issue #16847: Marking task as UP_FOR_RETRY but didn't retry
Posted by GitBox <gi...@apache.org>.
david30907d commented on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-877044803
@eejbyfeldt thanks for your fast reply, also good to know how to contribute to Airflow repo.
Will give it a shot on weekends 😄
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eejbyfeldt edited a comment on issue #16847: Marking task as UP_FOR_RETRY but didn't retry
Posted by GitBox <gi...@apache.org>.
eejbyfeldt edited a comment on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-876958347
We are also running into this issue. I believe this is a bug that was introduced in this PR: https://github.com/apache/airflow/pull/15172 meaning that 2.1.0 and 2.1.1 should be affected.
I believe the problematic code is this line:
https://github.com/apache/airflow/blob/db6acd9e8a91e0eca9e12cace72edc57b2667d25/airflow/jobs/local_task_job.py#L82-L85
that sets task_instance state to `State.FAILED`.
Here is a test case that can be added in `tests/jobs/test_local_task_job.py` that fails and shows the incorrect behavior
```
@pytest.mark.quarantined
def test_process_sigterm_retries(self):
"""
Test that ensures that when a task is killed with sigterm follows the
same retry logic as normal failures
"""
dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
def task_function(ti):
time.sleep(60)
task = PythonOperator(
task_id='test_on_failure',
python_callable=task_function,
retries=5,
dag=dag,
)
session = settings.Session()
dag.clear()
dag.create_dagrun(
run_id="test",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
settings.engine.dispose()
process = multiprocessing.Process(target=job1.run)
process.start()
for _ in range(0, 20):
ti.refresh_from_db()
if ti.state == State.RUNNING and ti.pid is not None:
break
time.sleep(0.2)
assert ti.pid is not None
assert ti.state == State.RUNNING
os.kill(process.pid, signal.SIGTERM)
process.join(timeout=10)
ti.refresh_from_db()
assert ti.state == State.UP_FOR_RETRY
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #16847: Task marked as UP_FOR_RETRY but was not retried
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-890921851
Closed by #16301
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk closed issue #16847: Task marked as UP_FOR_RETRY but was not retried
Posted by GitBox <gi...@apache.org>.
potiuk closed issue #16847:
URL: https://github.com/apache/airflow/issues/16847
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eejbyfeldt commented on issue #16847: Marking task as UP_FOR_RETRY but didn't retry
Posted by GitBox <gi...@apache.org>.
eejbyfeldt commented on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-876958347
We are also running into this issue. I believe this is a bug that was introduced in this PR: https://github.com/apache/airflow/pull/15172
I [believe](url) the problematic code is this line:
https://github.com/apache/airflow/blob/db6acd9e8a91e0eca9e12cace72edc57b2667d25/airflow/jobs/local_task_job.py#L82-L85
that sets task_instance state to `State.FAILED`.
Here is a test case that can be added in `tests/jobs/test_local_task_job.py` that fails and shows the incorrect behavior
```
@pytest.mark.quarantined
def test_process_sigterm_retries(self):
"""
Test that ensures that when a task is killed with sigterm follows the
same retry logic as normal failures
"""
dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
def task_function(ti):
time.sleep(60)
task = PythonOperator(
task_id='test_on_failure',
python_callable=task_function,
retries=5,
dag=dag,
)
session = settings.Session()
dag.clear()
dag.create_dagrun(
run_id="test",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
settings.engine.dispose()
process = multiprocessing.Process(target=job1.run)
process.start()
for _ in range(0, 20):
ti.refresh_from_db()
if ti.state == State.RUNNING and ti.pid is not None:
break
time.sleep(0.2)
assert ti.pid is not None
assert ti.state == State.RUNNING
os.kill(process.pid, signal.SIGTERM)
process.join(timeout=10)
ti.refresh_from_db()
assert ti.state == State.UP_FOR_RETRY
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eejbyfeldt edited a comment on issue #16847: Marking task as UP_FOR_RETRY but didn't retry
Posted by GitBox <gi...@apache.org>.
eejbyfeldt edited a comment on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-876958347
We are also running into this issue. I believe this is a bug that was introduced in this PR: https://github.com/apache/airflow/pull/15172 meaning that 2.1.0 and 2.1.1 should be affected.
I [believe](url) the problematic code is this line:
https://github.com/apache/airflow/blob/db6acd9e8a91e0eca9e12cace72edc57b2667d25/airflow/jobs/local_task_job.py#L82-L85
that sets task_instance state to `State.FAILED`.
Here is a test case that can be added in `tests/jobs/test_local_task_job.py` that fails and shows the incorrect behavior
```
@pytest.mark.quarantined
def test_process_sigterm_retries(self):
"""
Test that ensures that when a task is killed with sigterm follows the
same retry logic as normal failures
"""
dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
def task_function(ti):
time.sleep(60)
task = PythonOperator(
task_id='test_on_failure',
python_callable=task_function,
retries=5,
dag=dag,
)
session = settings.Session()
dag.clear()
dag.create_dagrun(
run_id="test",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
settings.engine.dispose()
process = multiprocessing.Process(target=job1.run)
process.start()
for _ in range(0, 20):
ti.refresh_from_db()
if ti.state == State.RUNNING and ti.pid is not None:
break
time.sleep(0.2)
assert ti.pid is not None
assert ti.state == State.RUNNING
os.kill(process.pid, signal.SIGTERM)
process.join(timeout=10)
ti.refresh_from_db()
assert ti.state == State.UP_FOR_RETRY
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #16847: Task marked as UP_FOR_RETRY but was not retried
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-890921275
> is there any timeline regarding when would this be released? I am using the official Docker images as the base, and couldn't see a release candidate that might include this, but maybe I am looking at the wrong place.
Look at the "milestone" - for the issue and PR. It is scheduled for 2.1.3 which is coming shortly (basically when we fix all the issues which are visible in the milestone - likely release coming in this or next week.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on issue #16847: Task marked as UP_FOR_RETRY but was not retried
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-888387527
This is now fixed in this PR https://github.com/apache/airflow/pull/16301
Can you verify @eejbyfeldt
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #16847: Marking task as UP_FOR_RETRY but didn't retry
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-875256438
Thanks for opening your first issue here! Be sure to follow the issue template!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] karakanb commented on issue #16847: Task marked as UP_FOR_RETRY but was not retried
Posted by GitBox <gi...@apache.org>.
karakanb commented on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-890910838
is there any timeline regarding when would this be released? I am using the official Docker images as the base, and couldn't see a release candidate that might include this, but maybe I am looking at the wrong place.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eejbyfeldt commented on issue #16847: Task marked as UP_FOR_RETRY but was not retried
Posted by GitBox <gi...@apache.org>.
eejbyfeldt commented on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-890894159
Yep, the fix looks like it solves the issue to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org