You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/22 11:12:42 UTC

[GitHub] [airflow] potiuk commented on pull request #13835: Revert "Fix error with quick-failing tasks in KubernetesPodOperator (…

potiuk commented on pull request #13835:
URL: https://github.com/apache/airflow/pull/13835#issuecomment-765329747


   ```
   
   self = <Task(KubernetesPodOperator): test>, context = {'dag': <DAG: dag>, 'task': <Task(KubernetesPodOperator): test>, 'ti': <TaskInstance: adhoc_airflow.test 2016-01-01 01:00:00+01:00 [None]>, 'ts': '2016-01-01T01:00:00+01:00'}
   
       def execute(self, context) -> Optional[str]:
           try:
               if self.in_cluster is not None:
                   client = kube_client.get_kube_client(
                       in_cluster=self.in_cluster,
                       cluster_context=self.cluster_context,
                       config_file=self.config_file,
                   )
               else:
                   client = kube_client.get_kube_client(
                       cluster_context=self.cluster_context, config_file=self.config_file
                   )
       
               self.pod = self.create_pod_request_obj()
               self.namespace = self.pod.metadata.namespace
       
               self.client = client
       
               # Add combination of labels to uniquely identify a running pod
               labels = self.create_labels_for_pod(context)
       
               label_selector = self._get_pod_identifying_label_string(labels)
       
               self.namespace = self.pod.metadata.namespace
       
               pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector)
       
               if len(pod_list.items) > 1 and self.reattach_on_restart:
                   raise AirflowException(
                       'More than one pod running with labels: '
                       '{label_selector}'.format(label_selector=label_selector)
                   )
       
               launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)
       
               if len(pod_list.items) == 1:
                   try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
                   final_state, result = self.handle_pod_overlap(
   >                   labels, try_numbers_match, launcher, pod_list.items[0]
                   )
   
   airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:336: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   
   self = <Task(KubernetesPodOperator): test>, labels = {'dag_id': 'dag', 'execution_date': '2016-01-01T0100000100-a2f50a31f', 'task_id': 'test', 'try_number': '1'}, try_numbers_match = False, launcher = <airflow.kubernetes.pod_launcher.PodLauncher object at 0x7eff6f133110>
   pod = {'api_version': None,
    'kind': None,
    'metadata': {'annotations': None,
                 'cluster_name': None,
             ...rt',
               'reason': None,
               'start_time': datetime.datetime(2021, 1, 22, 10, 59, 14, tzinfo=tzutc())}}
   
       def handle_pod_overlap(
           self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod
       ) -> Tuple[State, Optional[str]]:
           """
       
           In cases where the Scheduler restarts while a KubernetesPodOperator task is running,
           this function will either continue to monitor the existing pod or launch a new pod
           based on the `reattach_on_restart` parameter.
       
           :param labels: labels used to determine if a pod is repeated
           :type labels: dict
           :param try_numbers_match: do the try numbers match? Only needed for logging purposes
           :type try_numbers_match: bool
           :param launcher: PodLauncher
           :param pod_list: list of pods found
           """
           if try_numbers_match:
               log_line = f"found a running pod with labels {labels} and the same try_number."
           else:
               log_line = f"found a running pod with labels {labels} but a different try_number."
       
           # In case of failed pods, should reattach the first time, but only once
           # as the task will have already failed.
           if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"):
               log_line += " Will attach to this pod and monitor instead of starting new one"
               self.log.info(log_line)
               self.pod = pod
   >           final_state, result = self.monitor_launched_pod(launcher, pod)
   
   airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:376: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   
   self = <Task(KubernetesPodOperator): test>, launcher = <airflow.kubernetes.pod_launcher.PodLauncher object at 0x7eff6f133110>
   pod = {'api_version': None,
    'kind': None,
    'metadata': {'annotations': None,
                 'cluster_name': None,
             ...rt',
               'reason': None,
               'start_time': datetime.datetime(2021, 1, 22, 10, 59, 14, tzinfo=tzutc())}}
   
       def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]:
           """
           Monitors a pod to completion that was created by a previous KubernetesPodOperator
       
           :param launcher: pod launcher that will manage launching and monitoring pods
           :param pod: podspec used to find pod using k8s API
           :return:
           """
           try:
               (final_state, result) = launcher.monitor_pod(pod, get_logs=self.get_logs)
           finally:
               if self.is_delete_operator_pod:
                   launcher.delete_pod(pod)
           if final_state != State.SUCCESS:
               if self.log_events_on_failure:
                   for event in launcher.read_pod_events(pod).items:
                       self.log.error("Pod Event: %s - %s", event.reason, event.message)
               self.patch_already_checked(self.pod)
   >           raise AirflowException(f'Pod returned a failure: {final_state}')
   E           airflow.exceptions.AirflowException: Pod returned a failure: failed
   
   airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:524: AirflowException
   
   During handling of the above exception, another exception occurred:
   
   self = <kubernetes_tests.test_kubernetes_pod_operator.TestKubernetesPodOperatorSystem testMethod=test_reattach_failing_pod_once>
   
       def test_reattach_failing_pod_once(self):
           from airflow.utils.state import State
       
           client = kube_client.get_kube_client(in_cluster=False)
           name = "test"
           namespace = "default"
           k = KubernetesPodOperator(
               namespace='default',
               image="ubuntu:16.04",
               cmds=["bash", "-cx"],
               arguments=["exit 1"],
               labels={"foo": "bar"},
               name="test",
               task_id=name,
               in_cluster=False,
               do_xcom_push=False,
               is_delete_operator_pod=False,
               termination_grace_period=0,
           )
       
           context = create_context(k)
       
           with mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") as monitor_mock:
               monitor_mock.return_value = (State.SUCCESS, None)
               k.execute(context)
               name = k.pod.metadata.name
               pod = client.read_namespaced_pod(name=name, namespace=namespace)
               while pod.status.phase != "Failed":
                   pod = client.read_namespaced_pod(name=name, namespace=namespace)
           with pytest.raises(AirflowException):
   >           k.execute(context)
   
   kubernetes_tests/test_kubernetes_pod_operator.py:991: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:346: in execute
       self.patch_already_checked(self.pod)
   airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:504: in patch_already_checked
       self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body)
   .build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py:16004: in patch_namespaced_pod
       (data) = self.patch_namespaced_pod_with_http_info(name, namespace, body, **kwargs)  # noqa: E501
   .build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py:16109: in patch_namespaced_pod_with_http_info
       collection_formats=collection_formats)
   .build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api_client.py:345: in call_api
       _preload_content, _request_timeout)
   .build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api_client.py:176: in __call_api
       _request_timeout=_request_timeout)
   .build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api_client.py:404: in request
       body=body)
   .build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/rest.py:298: in PATCH
       body=body)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   
   self = <kubernetes.client.rest.RESTClientObject object at 0x7eff6f1b3ad0>, method = 'PATCH', url = 'https://0.0.0.0:19090/api/v1/namespaces/default/pods/test-38fdc44bef524fe5a18143d4155c56be', query_params = []
   headers = {'Accept': 'application/json', 'Content-Type': 'application/strategic-merge-patch+json', 'User-Agent': 'OpenAPI-Generator/11.0.0/python'}
   body = {'metadata': {'creationTimestamp': '2021-01-22T10:59:14+00:00', 'labels': {'airflow_version': '2.1.0.dev0', 'already_c...c6a871f3236940006ed31091de355578492ed140a39c', 'lastState': {}, ...}], 'hostIP': '172.18.0.2', 'phase': 'Failed', ...}}, post_params = {}, _preload_content = True
   _request_timeout = None
   
       def request(self, method, url, query_params=None, headers=None,
                   body=None, post_params=None, _preload_content=True,
                   _request_timeout=None):
           """Perform requests.
       
           :param method: http request method
           :param url: http request url
           :param query_params: query parameters in the url
           :param headers: http request headers
           :param body: request json body, for `application/json`
           :param post_params: request post parameters,
                               `application/x-www-form-urlencoded`
                               and `multipart/form-data`
           :param _preload_content: if False, the urllib3.HTTPResponse object will
                                    be returned without reading/decoding response
                                    data. Default is True.
           :param _request_timeout: timeout setting for this request. If one
                                    number provided, it will be total request
                                    timeout. It can also be a pair (tuple) of
                                    (connection, read) timeouts.
           """
           method = method.upper()
           assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT',
                             'PATCH', 'OPTIONS']
       
           if post_params and body:
               raise ValueError(
                   "body parameter cannot be used with post_params parameter."
               )
       
           post_params = post_params or {}
           headers = headers or {}
       
           timeout = None
           if _request_timeout:
               if isinstance(_request_timeout, (int, ) if six.PY3 else (int, long)):  # noqa: E501,F821
                   timeout = urllib3.Timeout(total=_request_timeout)
               elif (isinstance(_request_timeout, tuple) and
                     len(_request_timeout) == 2):
                   timeout = urllib3.Timeout(
                       connect=_request_timeout[0], read=_request_timeout[1])
       
           if 'Content-Type' not in headers:
               headers['Content-Type'] = 'application/json'
       
           try:
               # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE`
               if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']:
                   if query_params:
                       url += '?' + urlencode(query_params)
                   if re.search('json', headers['Content-Type'], re.IGNORECASE):
                       if headers['Content-Type'] == 'application/json-patch+json':
                           if not isinstance(body, list):
                               headers['Content-Type'] = \
                                   'application/strategic-merge-patch+json'
                       request_body = None
                       if body is not None:
                           request_body = json.dumps(body)
                       r = self.pool_manager.request(
                           method, url,
                           body=request_body,
                           preload_content=_preload_content,
                           timeout=timeout,
                           headers=headers)
                   elif headers['Content-Type'] == 'application/x-www-form-urlencoded':  # noqa: E501
                       r = self.pool_manager.request(
                           method, url,
                           fields=post_params,
                           encode_multipart=False,
                           preload_content=_preload_content,
                           timeout=timeout,
                           headers=headers)
                   elif headers['Content-Type'] == 'multipart/form-data':
                       # must del headers['Content-Type'], or the correct
                       # Content-Type which generated by urllib3 will be
                       # overwritten.
                       del headers['Content-Type']
                       r = self.pool_manager.request(
                           method, url,
                           fields=post_params,
                           encode_multipart=True,
                           preload_content=_preload_content,
                           timeout=timeout,
                           headers=headers)
                   # Pass a `string` parameter directly in the body to support
                   # other content types than Json when `body` argument is
                   # provided in serialized form
                   elif isinstance(body, str):
                       request_body = body
                       r = self.pool_manager.request(
                           method, url,
                           body=request_body,
                           preload_content=_preload_content,
                           timeout=timeout,
                           headers=headers)
                   else:
                       # Cannot generate the request from given parameters
                       msg = """Cannot prepare a request message for provided
                                arguments. Please check that your arguments match
                                declared content type."""
                       raise ApiException(status=0, reason=msg)
               # For `GET`, `HEAD`
               else:
                   r = self.pool_manager.request(method, url,
                                                 fields=query_params,
                                                 preload_content=_preload_content,
                                                 timeout=timeout,
                                                 headers=headers)
           except urllib3.exceptions.SSLError as e:
               msg = "{0}\n{1}".format(type(e).__name__, str(e))
               raise ApiException(status=0, reason=msg)
       
           if _preload_content:
               r = RESTResponse(r)
       
               # In the python 3, the response.data is bytes.
               # we need to decode it to string.
               if six.PY3:
                   r.data = r.data.decode('utf8')
       
               # log response body
               logger.debug("response body: %s", r.data)
       
           if not 200 <= r.status <= 299:
   >           raise ApiException(http_resp=r)
   E           kubernetes.client.rest.ApiException: (409)
   E           Reason: Conflict
   E           HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Fri, 22 Jan 2021 10:59:22 GMT', 'Content-Length': '358'})
   E           HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Operation cannot be fulfilled on pods \"test-38fdc44bef524fe5a18143d4155c56be\": the object has been modified; please apply your changes to the latest version and try again","reason":"Conflict","details":{"name":"test-38fdc44bef524fe5a18143d4155c56be","kind":"pods"},"code":409}
   
   .build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/rest.py:231: ApiException
   --------------------------------------------------------------------------------------------------------------------------------------------- Captured stdout call ----------------------------------------------------------------------------------------------------------------------------------------------
   [2021-01-22 11:59:14,860] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   [2021-01-22 11:59:14,860] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   [2021-01-22 11:59:15,869] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   [2021-01-22 11:59:15,869] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   [2021-01-22 11:59:16,878] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   [2021-01-22 11:59:16,878] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   [2021-01-22 11:59:17,884] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   [2021-01-22 11:59:17,885] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   [2021-01-22 11:59:18,898] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   [2021-01-22 11:59:18,899] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   [2021-01-22 11:59:19,903] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   [2021-01-22 11:59:19,903] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   [2021-01-22 11:59:20,906] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   [2021-01-22 11:59:20,906] {pod_launcher.py:113} WARNING - Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   [2021-01-22 11:59:21,910] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
   [2021-01-22 11:59:21,910] {pod_launcher.py:286} ERROR - Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
   [2021-01-22 11:59:21,936] {pod_launcher.py:136} INFO - + exit 1
   [2021-01-22 11:59:22,953] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
   [2021-01-22 11:59:22,954] {pod_launcher.py:286} ERROR - Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
   [2021-01-22 11:59:22,956] {pod_launcher.py:176} INFO - Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
   [2021-01-22 11:59:22,956] {pod_launcher.py:286} ERROR - Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
   ----------------------------------------------------------------------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------------------------------------------------------------------
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   WARNING  airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   WARNING  airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   WARNING  airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   WARNING  airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   WARNING  airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   WARNING  airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
   WARNING  airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod not yet started: test-38fdc44bef524fe5a18143d4155c56be
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
   ERROR    airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:286 Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:136 + exit 1
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
   ERROR    airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:286 Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
   INFO     airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176 Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
   ERROR    airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:286 Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org