You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ning Kang <ni...@google.com> on 2020/03/02 22:34:50 UTC
Error logging from fn_api_runners
Hi,
I just observed some error level loggings like these:
```
ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
{'worker_5':
<apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
at 0x127fdaa58>}
ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
{'worker_5':
<apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
at 0x127fdaa58>}
ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
{'worker_5':
<apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
at 0x127fdaa58>}
ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
{'worker_5':
<apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
at 0x127fdaa58>}
```
It's coming from this PR
<https://github.com/apache/beam/commit/921a9a83a156eb6af500ace1e44b91821ad426b1#diff-adf22e1eb39b451e73d4541052a33686R2015>
.
```
def get_worker_handlers(
self,
environment_id, # type: Optional[str]
num_workers # type: int
):
# type: (...) -> List[WorkerHandler]
if environment_id is None:
# Any environment will do, pick one arbitrarily.
environment_id = next(iter(self._environments.keys()))
environment = self._environments[environment_id]
# assume all environments except EMBEDDED_PYTHON use gRPC.
if environment.urn == python_urns.EMBEDDED_PYTHON:
# special case for EmbeddedWorkerHandler: there's no need for a gRPC
# server, but to pass the type check on WorkerHandler.create() we
# make like we have a GrpcServer instance.
self._grpc_server = cast(GrpcServer, None)
elif self._grpc_server is None:
self._grpc_server = GrpcServer(
self._state, self._job_provision_info, self)
worker_handler_list = self._cached_handlers[environment_id]
if len(worker_handler_list) < num_workers:
for _ in range(len(worker_handler_list), num_workers):
worker_handler = WorkerHandler.create(
environment,
self._state,
self._job_provision_info,
self._grpc_server)
_LOGGER.info(
"Created Worker handler %s for environment %s",
worker_handler,
environment)
self._cached_handlers[environment_id].append(worker_handler)
self._workers_by_id[worker_handler.worker_id] = worker_handler
worker_handler.start_worker()
_LOGGER.error("created %s workers %s", num_workers, self._workers_by_id)
return self._cached_handlers[environment_id][:num_workers]
```
Is this supposed to be an info level logging?
Thanks!
Ning.
Re: Error logging from fn_api_runners
Posted by Robert Bradshaw <ro...@google.com>.
Yeah, this was an oversight on my part. I don't think we need to log
this at all. https://github.com/apache/beam/pull/11021 for anyone to
look at.
On Mon, Mar 2, 2020 at 2:44 PM Heejong Lee <he...@google.com> wrote:
>
> I think it should be either info or debug but not error.
>
> On Mon, Mar 2, 2020 at 2:35 PM Ning Kang <ni...@google.com> wrote:
>>
>> Hi,
>>
>> I just observed some error level loggings like these:
>> ```
>> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers {'worker_5': <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object at 0x127fdaa58>}
>> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers {'worker_5': <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object at 0x127fdaa58>}
>> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers {'worker_5': <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object at 0x127fdaa58>}
>> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers {'worker_5': <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object at 0x127fdaa58>}
>> ```
>> It's coming from this PR.
>> ```
>>
>> def get_worker_handlers(
>> self,
>> environment_id, # type: Optional[str]
>> num_workers # type: int
>> ):
>> # type: (...) -> List[WorkerHandler]
>> if environment_id is None:
>> # Any environment will do, pick one arbitrarily.
>> environment_id = next(iter(self._environments.keys()))
>> environment = self._environments[environment_id]
>>
>> # assume all environments except EMBEDDED_PYTHON use gRPC.
>> if environment.urn == python_urns.EMBEDDED_PYTHON:
>> # special case for EmbeddedWorkerHandler: there's no need for a gRPC
>> # server, but to pass the type check on WorkerHandler.create() we
>> # make like we have a GrpcServer instance.
>> self._grpc_server = cast(GrpcServer, None)
>> elif self._grpc_server is None:
>> self._grpc_server = GrpcServer(
>> self._state, self._job_provision_info, self)
>>
>> worker_handler_list = self._cached_handlers[environment_id]
>> if len(worker_handler_list) < num_workers:
>> for _ in range(len(worker_handler_list), num_workers):
>> worker_handler = WorkerHandler.create(
>> environment,
>> self._state,
>> self._job_provision_info,
>> self._grpc_server)
>> _LOGGER.info(
>> "Created Worker handler %s for environment %s",
>> worker_handler,
>> environment)
>> self._cached_handlers[environment_id].append(worker_handler)
>> self._workers_by_id[worker_handler.worker_id] = worker_handler
>> worker_handler.start_worker()
>> _LOGGER.error("created %s workers %s", num_workers, self._workers_by_id)
>> return self._cached_handlers[environment_id][:num_workers]
>>
>> ```
>> Is this supposed to be an info level logging?
>>
>> Thanks!
>>
>> Ning.
Re: Error logging from fn_api_runners
Posted by Heejong Lee <he...@google.com>.
I think it should be either info or debug but not error.
On Mon, Mar 2, 2020 at 2:35 PM Ning Kang <ni...@google.com> wrote:
> Hi,
>
> I just observed some error level loggings like these:
> ```
> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
> {'worker_5':
> <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
> at 0x127fdaa58>}
> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
> {'worker_5':
> <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
> at 0x127fdaa58>}
> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
> {'worker_5':
> <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
> at 0x127fdaa58>}
> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
> {'worker_5':
> <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
> at 0x127fdaa58>}
> ```
> It's coming from this PR
> <https://github.com/apache/beam/commit/921a9a83a156eb6af500ace1e44b91821ad426b1#diff-adf22e1eb39b451e73d4541052a33686R2015>
> .
> ```
>
> def get_worker_handlers(
> self,
> environment_id, # type: Optional[str]
> num_workers # type: int
> ):
> # type: (...) -> List[WorkerHandler]
> if environment_id is None:
> # Any environment will do, pick one arbitrarily.
> environment_id = next(iter(self._environments.keys()))
> environment = self._environments[environment_id]
>
> # assume all environments except EMBEDDED_PYTHON use gRPC.
> if environment.urn == python_urns.EMBEDDED_PYTHON:
> # special case for EmbeddedWorkerHandler: there's no need for a gRPC
> # server, but to pass the type check on WorkerHandler.create() we
> # make like we have a GrpcServer instance.
> self._grpc_server = cast(GrpcServer, None)
> elif self._grpc_server is None:
> self._grpc_server = GrpcServer(
> self._state, self._job_provision_info, self)
>
> worker_handler_list = self._cached_handlers[environment_id]
> if len(worker_handler_list) < num_workers:
> for _ in range(len(worker_handler_list), num_workers):
> worker_handler = WorkerHandler.create(
> environment,
> self._state,
> self._job_provision_info,
> self._grpc_server)
> _LOGGER.info(
> "Created Worker handler %s for environment %s",
> worker_handler,
> environment)
> self._cached_handlers[environment_id].append(worker_handler)
> self._workers_by_id[worker_handler.worker_id] = worker_handler
> worker_handler.start_worker()
> _LOGGER.error("created %s workers %s", num_workers, self._workers_by_id)
> return self._cached_handlers[environment_id][:num_workers]
>
> ```
> Is this supposed to be an info level logging?
>
> Thanks!
>
> Ning.
>