You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/22 11:12:42 UTC
[GitHub] [airflow] potiuk commented on pull request #13835: Revert "Fix error with quick-failing tasks in KubernetesPodOperator (…
potiuk commented on pull request #13835:
URL: https://github.com/apache/airflow/pull/13835#issuecomment-765329747
```
self = <Task(KubernetesPodOperator): test>, context = {'dag': <DAG: dag>, 'task': <Task(KubernetesPodOperator): test>, 'ti': <TaskInstance: adhoc_airflow.test 2016-01-01 01:00:00+01:00 [None]>, 'ts': '2016-01-01T01:00:00+01:00'}
def execute(self, context) -> Optional[str]:
try:
if self.in_cluster is not None:
client = kube_client.get_kube_client(
in_cluster=self.in_cluster,
cluster_context=self.cluster_context,
config_file=self.config_file,
)
else:
client = kube_client.get_kube_client(
cluster_context=self.cluster_context, config_file=self.config_file
)
self.pod = self.create_pod_request_obj()
self.namespace = self.pod.metadata.namespace
self.client = client
# Add combination of labels to uniquely identify a running pod
labels = self.create_labels_for_pod(context)
label_selector = self._get_pod_identifying_label_string(labels)
self.namespace = self.pod.metadata.namespace
pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector)
if len(pod_list.items) > 1 and self.reattach_on_restart:
raise AirflowException(
'More than one pod running with labels: '
'{label_selector}'.format(label_selector=label_selector)
)
launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)
if len(pod_list.items) == 1:
try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
final_state, result = self.handle_pod_overlap(
> labels, try_numbers_match, launcher, pod_list.items[0]
)
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:336:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Task(KubernetesPodOperator): test>, labels = {'dag_id': 'dag', 'execution_date': '2016-01-01T0100000100-a2f50a31f', 'task_id': 'test', 'try_number': '1'}, try_numbers_match = False, launcher = <airflow.kubernetes.pod_launcher.PodLauncher object at 0x7eff6f133110>
pod = {'api_version': None,
'kind': None,
'metadata': {'annotations': None,
'cluster_name': None,
...rt',
'reason': None,
'start_time': datetime.datetime(2021, 1, 22, 10, 59, 14, tzinfo=tzutc())}}
def handle_pod_overlap(
self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod
) -> Tuple[State, Optional[str]]:
"""
In cases where the Scheduler restarts while a KubernetesPodOperator task is running,
this function will either continue to monitor the existing pod or launch a new pod
based on the `reattach_on_restart` parameter.
:param labels: labels used to determine if a pod is repeated
:type labels: dict
:param try_numbers_match: do the try numbers match? Only needed for logging purposes
:type try_numbers_match: bool
:param launcher: PodLauncher
:param pod_list: list of pods found
"""
if try_numbers_match:
log_line = f"found a running pod with labels {labels} and the same try_number."
else:
log_line = f"found a running pod with labels {labels} but a different try_number."
# In case of failed pods, should reattach the first time, but only once
# as the task will have already failed.
if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"):
log_line += " Will attach to this pod and monitor instead of starting new one"
self.log.info(log_line)
self.pod = pod
> final_state, result = self.monitor_launched_pod(launcher, pod)
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:376:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Task(KubernetesPodOperator): test>, launcher = <airflow.kubernetes.pod_launcher.PodLauncher object at 0x7eff6f133110>
pod = {'api_version': None,
'kind': None,
'metadata': {'annotations': None,
'cluster_name': None,
...rt',
'reason': None,
'start_time': datetime.datetime(2021, 1, 22, 10, 59, 14, tzinfo=tzutc())}}
def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]:
"""
Monitors a pod to completion that was created by a previous KubernetesPodOperator
:param launcher: pod launcher that will manage launching and monitoring pods
:param pod: podspec used to find pod using k8s API
:return:
"""
try:
(final_state, result) = launcher.monitor_pod(pod, get_logs=self.get_logs)
finally:
if self.is_delete_operator_pod:
launcher.delete_pod(pod)
if final_state != State.SUCCESS:
if self.log_events_on_failure:
for event in launcher.read_pod_events(pod).items:
self.log.error("Pod Event: %s - %s", event.reason, event.message)
self.patch_already_checked(self.pod)
> raise AirflowException(f'Pod returned a failure: {final_state}')
E airflow.exceptions.AirflowException: Pod returned a failure: failed
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:524: AirflowException
During handling of the above exception, another exception occurred:
self = <kubernetes_tests.test_kubernetes_pod_operator.TestKubernetesPodOperatorSystem testMethod=test_reattach_failing_pod_once>
def test_reattach_failing_pod_once(self):
from airflow.utils.state import State
client = kube_client.get_kube_client(in_cluster=False)
name = "test"
namespace = "default"
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["exit 1"],
labels={"foo": "bar"},
name="test",
task_id=name,
in_cluster=False,
do_xcom_push=False,
is_delete_operator_pod=False,
termination_grace_period=0,
)
context = create_context(k)
with mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") as monitor_mock:
monitor_mock.return_value = (State.SUCCESS, None)
k.execute(context)
name = k.pod.metadata.name
pod = client.read_namespaced_pod(name=name, namespace=namespace)
while pod.status.phase != "Failed":
pod = client.read_namespaced_pod(name=name, namespace=namespace)
with pytest.raises(AirflowException):
> k.execute(context)
kubernetes_tests/test_kubernetes_pod_operator.py:991:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:346: in execute
self.patch_already_checked(self.pod)
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:504: in patch_already_checked
self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body)
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py:16004: in patch_namespaced_pod
(data) = self.patch_namespaced_pod_with_http_info(name, namespace, body, **kwargs) # noqa: E501
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py:16109: in patch_namespaced_pod_with_http_info
collection_formats=collection_formats)
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api_client.py:345: in call_api
_preload_content, _request_timeout)
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api_client.py:176: in __call_api
_request_timeout=_request_timeout)
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api_client.py:404: in request
body=body)
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/rest.py:298: in PATCH
body=body)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <kubernetes.client.rest.RESTClientObject object at 0x7eff6f1b3ad0>, method = 'PATCH', url = 'https://0.0.0.0:19090/api/v1/namespaces/default/pods/test-38fdc44bef524fe5a18143d4155c56be', query_params = []
headers = {'Accept': 'application/json', 'Content-Type': 'application/strategic-merge-patch+json', 'User-Agent': 'OpenAPI-Generator/11.0.0/python'}
body = {'metadata': {'creationTimestamp': '2021-01-22T10:59:14+00:00', 'labels': {'airflow_version': '2.1.0.dev0', 'already_c...c6a871f3236940006ed31091de355578492ed140a39c', 'lastState': {}, ...}], 'hostIP': '172.18.0.2', 'phase': 'Failed', ...}}, post_params = {}, _preload_content = True
_request_timeout = None
def request(self, method, url, query_params=None, headers=None,
body=None, post_params=None, _preload_content=True,
_request_timeout=None):
"""Perform requests.
:param method: http request method
:param url: http request url
:param query_params: query parameters in the url
:param headers: http request headers
:param body: request json body, for `application/json`
:param post_params: request post parameters,
`application/x-www-form-urlencoded`
and `multipart/form-data`
:param _preload_content: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
"""
method = method.upper()
assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT',
'PATCH', 'OPTIONS']
if post_params and body:
raise ValueError(
"body parameter cannot be used with post_params parameter."
)
post_params = post_params or {}
headers = headers or {}
timeout = None
if _request_timeout:
if isinstance(_request_timeout, (int, ) if six.PY3 else (int, long)): # noqa: E501,F821
timeout = urllib3.Timeout(total=_request_timeout)
elif (isinstance(_request_timeout, tuple) and
len(_request_timeout) == 2):
timeout = urllib3.Timeout(
connect=_request_timeout[0], read=_request_timeout[1])
if 'Content-Type' not in headers:
headers['Content-Type'] = 'application/json'
try:
# For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE`
if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']:
if query_params:
url += '?' + urlencode(query_params)
if re.search('json', headers['Content-Type'], re.IGNORECASE):
if headers['Content-Type'] == 'application/json-patch+json':
if not isinstance(body, list):
headers['Content-Type'] = \
'application/strategic-merge-patch+json'
request_body = None
if body is not None:
request_body = json.dumps(body)
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] == 'application/x-www-form-urlencoded': # noqa: E501
r = self.pool_manager.request(
method, url,
fields=post_params,
encode_multipart=False,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] == 'multipart/form-data':
# must del headers['Content-Type'], or the correct
# Content-Type which generated by urllib3 will be
# overwritten.
del headers['Content-Type']
r = self.pool_manager.request(
method, url,
fields=post_params,
encode_multipart=True,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
# Pass a `string` parameter directly in the body to support
# other content types than Json when `body` argument is
# provided in serialized form
elif isinstance(body, str):
request_body = body
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
else:
# Cannot generate the request from given parameters
msg = """Cannot prepare a request message for provided
arguments. Please check that your arguments match
declared content type."""
raise ApiException(status=0, reason=msg)
# For `GET`, `HEAD`
else:
r = self.pool_manager.request(method, url,
fields=query_params,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if _preload_content:
r = RESTResponse(r)
# In the python 3, the response.data is bytes.
# we need to decode it to string.
if six.PY3:
r.data = r.data.decode('utf8')
# log response body
logger.debug("response body: %s", r.data)
if not 200 <= r.status <= 299:
> raise ApiException(http_resp=r)
E kubernetes.client.rest.ApiException: (409)
E Reason: Conflict
E HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Fri, 22 Jan 2021 10:59:22 GMT', 'Content-Length': '358'})
E HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Operation cannot be fulfilled on pods \"test-38fdc44bef524fe5a18143d4155c56be\": the object has been modified; please apply your changes to the latest version and try again","reason":"Conflict","details":{"name":"test-38fdc44bef524fe5a18143d4155c56be","kind":"pods"},"code":409}
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/rest.py:231: ApiException
--------------------------------------------------------------------------------------------------------------------------------------------- Captured stdout call ----------------------------------------------------------------------------------------------------------------------------------------------
[2021-01-22 11:59:14,860] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:14,860] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:15,869] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:15,869] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:16,878] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:16,878] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:17,884] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:17,885] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:18,898] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:18,899] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:19,903] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:19,903] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:20,906] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:20,906] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:21,910] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
[2021-01-22 11:59:21,910] {pod_launcher.py:286} ERROR - Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
[2021-01-22 11:59:21,936] {pod_launcher.py:136} INFO - + exit 1
[2021-01-22 11:59:22,953] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
[2021-01-22 11:59:22,954] {pod_launcher.py:286} ERROR - Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
[2021-01-22 11:59:22,956] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
[2021-01-22 11:59:22,956] {pod_launcher.py:286} ERROR - Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
----------------------------------------------------------------------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------------------------------------------------------------------
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
ERROR airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:286 Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:136 + exit 1
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
ERROR airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:286 Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
ERROR airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:286 Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org