You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pradip Thachile <pr...@thachile.com> on 2020/06/06 05:38:18 UTC

Occasional RPC DEADLINE_EXCEEDED with PubSub source on Python/DirectRunner

Hi,

I'm testing a pipeline using Python Beam and the DirectRunner that reads from a Pub/Sub subscription however after "some time" (i.e. I don't quite see a predictable pattern yet) I get a bundle failure exception that flags a failure with a gRPC call to Pub/Sub:

ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x120aaf550>, due to an exception.
 Traceback (most recent call last):
  File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "<redacted venv path>/lib/python3.7/site-packages/grpc/_channel.py", line 826, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "<redacted venv path>/lib/python3.7/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.DEADLINE_EXCEEDED
    details = "Deadline Exceeded"
    debug_error_string = "{"created":"@1591255982.041826000","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}"

This then causes a number of other knock-on exceptions (shown farther below). When this happens, looking at the subscription its clear that the number of unacked/old messages both start increasing and there's a long ensuing delay until messages are output by downstream GBK/Combine operations. Not really sure how to begin debugging this and would love to get the communities feedback.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<redacted venv path>/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 381, in call
    finish_state)
  File "<redacted venv path>/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 421, in attempt_call
    result = evaluator.finish_bundle()
  File "<redacted venv path>/lib/python3.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 574, in finish_bundle
    data = self._read_from_pubsub(self.source.timestamp_attribute)
  File "<redacted venv path>/lib/python3.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 562, in _read_from_pubsub
    return_immediately=True)
  File "<redacted venv path>/lib/python3.7/site-packages/google/cloud/pubsub_v1/_gapic.py", line 40, in <lambda>
    fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw)  # noqa
  File "<redacted venv path>/lib/python3.7/site-packages/google/cloud/pubsub_v1/gapic/subscriber_client.py", line 1005, in pull
    request, retry=retry, timeout=timeout, metadata=metadata
  File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
    return wrapped_func(*args, **kwargs)
  File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
    on_error=on_error,
  File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target
    return target()
  File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
    return func(*args, **kwargs)
  File "<redacted venv path>/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded

ERROR:grpc._plugin_wrapping:AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x12093c1d0>" raised exception!
Traceback (most recent call last):
  File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "<redacted venv path>/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
    raise err
  File "<redacted venv path>/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
    sock.connect(sa)
TimeoutError: [Errno 60] Operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 381, in _make_request
    self._validate_conn(conn)
  File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 976, in _validate_conn
    conn.connect()
  File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connection.py", line 308, in connect
    conn = self._new_conn()
  File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connection.py", line 172, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x120257d90>: Failed to establish a new connection: [Errno 60] Operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<redacted venv path>/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "<redacted venv path>/lib/python3.7/site-packages/urllib3/connectionpool.py", line 725, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "<redacted venv path>/lib/python3.7/site-packages/urllib3/util/retry.py", line 439, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: /token (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x120257d90>: Failed to establish a new connection: [Errno 60] Operation timed out'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<redacted venv path>/lib/python3.7/site-packages/google/auth/transport/requests.py", line 181, in __call__
    method, url, data=body, headers=headers, timeout=timeout, **kwargs
  File "<redacted venv path>/lib/python3.7/site-packages/requests/sessions.py", line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "<redacted venv path>/lib/python3.7/site-packages/requests/sessions.py", line 643, in send
    r = adapter.send(request, **kwargs)
  File "<redacted venv path>/lib/python3.7/site-packages/requests/adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: /token (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x120257d90>: Failed to establish a new connection: [Errno 60] Operation timed out'))

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<redacted venv path>/lib/python3.7/site-packages/grpc/_plugin_wrapping.py", line 78, in __call__
    context, _AuthMetadataPluginCallback(callback_state, callback))
  File "<redacted venv path>/lib/python3.7/site-packages/google/auth/transport/grpc.py", line 84, in __call__
    callback(self._get_authorization_headers(context), None)
  File "<redacted venv path>/lib/python3.7/site-packages/google/auth/transport/grpc.py", line 71, in _get_authorization_headers
    self._request, context.method_name, context.service_url, headers
  File "<redacted venv path>/lib/python3.7/site-packages/google/auth/credentials.py", line 124, in before_request
    self.refresh(request)
  File "<redacted venv path>/lib/python3.7/site-packages/google/oauth2/service_account.py", line 334, in refresh
    access_token, expiry, _ = _client.jwt_grant(request, self._token_uri, assertion)
  File "<redacted venv path>/lib/python3.7/site-packages/google/oauth2/_client.py", line 153, in jwt_grant
    response_data = _token_endpoint_request(request, token_uri, body)
  File "<redacted venv path>/lib/python3.7/site-packages/google/oauth2/_client.py", line 105, in _token_endpoint_request
    response = request(method="POST", url=token_uri, headers=headers, body=body)
  File "<redacted venv path>/lib/python3.7/site-packages/google/auth/transport/requests.py", line 186, in __call__
    six.raise_from(new_exc, caught_exc)
  File "<string>", line 3, in raise_from
google.auth.exceptions.TransportError: HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: /token (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x120257d90>: Failed to establish a new connection: [Errno 60] Operation timed out'))

-Pradip