You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ning Kang <ni...@google.com> on 2020/03/02 22:34:50 UTC

Error logging from fn_api_runners

Hi,

I just observed some error level loggings like these:
```
ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
{'worker_5':
<apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
at 0x127fdaa58>}
ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
{'worker_5':
<apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
at 0x127fdaa58>}
ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
{'worker_5':
<apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
at 0x127fdaa58>}
ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
{'worker_5':
<apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
at 0x127fdaa58>}
```
It's coming from this PR
<https://github.com/apache/beam/commit/921a9a83a156eb6af500ace1e44b91821ad426b1#diff-adf22e1eb39b451e73d4541052a33686R2015>
.
```

def get_worker_handlers(
    self,
    environment_id,  # type: Optional[str]
    num_workers  # type: int
):
  # type: (...) -> List[WorkerHandler]
  if environment_id is None:
    # Any environment will do, pick one arbitrarily.
    environment_id = next(iter(self._environments.keys()))
  environment = self._environments[environment_id]

  # assume all environments except EMBEDDED_PYTHON use gRPC.
  if environment.urn == python_urns.EMBEDDED_PYTHON:
    # special case for EmbeddedWorkerHandler: there's no need for a gRPC
    # server, but to pass the type check on WorkerHandler.create() we
    # make like we have a GrpcServer instance.
    self._grpc_server = cast(GrpcServer, None)
  elif self._grpc_server is None:
    self._grpc_server = GrpcServer(
        self._state, self._job_provision_info, self)

  worker_handler_list = self._cached_handlers[environment_id]
  if len(worker_handler_list) < num_workers:
    for _ in range(len(worker_handler_list), num_workers):
      worker_handler = WorkerHandler.create(
          environment,
          self._state,
          self._job_provision_info,
          self._grpc_server)
      _LOGGER.info(
          "Created Worker handler %s for environment %s",
          worker_handler,
          environment)
      self._cached_handlers[environment_id].append(worker_handler)
      self._workers_by_id[worker_handler.worker_id] = worker_handler
      worker_handler.start_worker()
  _LOGGER.error("created %s workers %s", num_workers, self._workers_by_id)
  return self._cached_handlers[environment_id][:num_workers]

```
Is this supposed to be an info level logging?

Thanks!

Ning.

Re: Error logging from fn_api_runners

Posted by Robert Bradshaw <ro...@google.com>.
Yeah, this was an oversight on my part. I don't think we need to log
this at all. https://github.com/apache/beam/pull/11021 for anyone to
look at.

On Mon, Mar 2, 2020 at 2:44 PM Heejong Lee <he...@google.com> wrote:
>
> I think it should be either info or debug but not error.
>
> On Mon, Mar 2, 2020 at 2:35 PM Ning Kang <ni...@google.com> wrote:
>>
>> Hi,
>>
>> I just observed some error level loggings like these:
>> ```
>> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers {'worker_5': <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object at 0x127fdaa58>}
>> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers {'worker_5': <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object at 0x127fdaa58>}
>> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers {'worker_5': <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object at 0x127fdaa58>}
>> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers {'worker_5': <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object at 0x127fdaa58>}
>> ```
>> It's coming from this PR.
>> ```
>>
>> def get_worker_handlers(
>>     self,
>>     environment_id,  # type: Optional[str]
>>     num_workers  # type: int
>> ):
>>   # type: (...) -> List[WorkerHandler]
>>   if environment_id is None:
>>     # Any environment will do, pick one arbitrarily.
>>     environment_id = next(iter(self._environments.keys()))
>>   environment = self._environments[environment_id]
>>
>>   # assume all environments except EMBEDDED_PYTHON use gRPC.
>>   if environment.urn == python_urns.EMBEDDED_PYTHON:
>>     # special case for EmbeddedWorkerHandler: there's no need for a gRPC
>>     # server, but to pass the type check on WorkerHandler.create() we
>>     # make like we have a GrpcServer instance.
>>     self._grpc_server = cast(GrpcServer, None)
>>   elif self._grpc_server is None:
>>     self._grpc_server = GrpcServer(
>>         self._state, self._job_provision_info, self)
>>
>>   worker_handler_list = self._cached_handlers[environment_id]
>>   if len(worker_handler_list) < num_workers:
>>     for _ in range(len(worker_handler_list), num_workers):
>>       worker_handler = WorkerHandler.create(
>>           environment,
>>           self._state,
>>           self._job_provision_info,
>>           self._grpc_server)
>>       _LOGGER.info(
>>           "Created Worker handler %s for environment %s",
>>           worker_handler,
>>           environment)
>>       self._cached_handlers[environment_id].append(worker_handler)
>>       self._workers_by_id[worker_handler.worker_id] = worker_handler
>>       worker_handler.start_worker()
>>   _LOGGER.error("created %s workers %s", num_workers, self._workers_by_id)
>>   return self._cached_handlers[environment_id][:num_workers]
>>
>> ```
>> Is this supposed to be an info level logging?
>>
>> Thanks!
>>
>> Ning.

Re: Error logging from fn_api_runners

Posted by Heejong Lee <he...@google.com>.
I think it should be either info or debug but not error.

On Mon, Mar 2, 2020 at 2:35 PM Ning Kang <ni...@google.com> wrote:

> Hi,
>
> I just observed some error level loggings like these:
> ```
> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
> {'worker_5':
> <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
> at 0x127fdaa58>}
> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
> {'worker_5':
> <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
> at 0x127fdaa58>}
> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
> {'worker_5':
> <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
> at 0x127fdaa58>}
> ERROR:apache_beam.runners.portability.fn_api_runner:created 1 workers
> {'worker_5':
> <apache_beam.runners.portability.fn_api_runner.EmbeddedWorkerHandler object
> at 0x127fdaa58>}
> ```
> It's coming from this PR
> <https://github.com/apache/beam/commit/921a9a83a156eb6af500ace1e44b91821ad426b1#diff-adf22e1eb39b451e73d4541052a33686R2015>
> .
> ```
>
> def get_worker_handlers(
>     self,
>     environment_id,  # type: Optional[str]
>     num_workers  # type: int
> ):
>   # type: (...) -> List[WorkerHandler]
>   if environment_id is None:
>     # Any environment will do, pick one arbitrarily.
>     environment_id = next(iter(self._environments.keys()))
>   environment = self._environments[environment_id]
>
>   # assume all environments except EMBEDDED_PYTHON use gRPC.
>   if environment.urn == python_urns.EMBEDDED_PYTHON:
>     # special case for EmbeddedWorkerHandler: there's no need for a gRPC
>     # server, but to pass the type check on WorkerHandler.create() we
>     # make like we have a GrpcServer instance.
>     self._grpc_server = cast(GrpcServer, None)
>   elif self._grpc_server is None:
>     self._grpc_server = GrpcServer(
>         self._state, self._job_provision_info, self)
>
>   worker_handler_list = self._cached_handlers[environment_id]
>   if len(worker_handler_list) < num_workers:
>     for _ in range(len(worker_handler_list), num_workers):
>       worker_handler = WorkerHandler.create(
>           environment,
>           self._state,
>           self._job_provision_info,
>           self._grpc_server)
>       _LOGGER.info(
>           "Created Worker handler %s for environment %s",
>           worker_handler,
>           environment)
>       self._cached_handlers[environment_id].append(worker_handler)
>       self._workers_by_id[worker_handler.worker_id] = worker_handler
>       worker_handler.start_worker()
>   _LOGGER.error("created %s workers %s", num_workers, self._workers_by_id)
>   return self._cached_handlers[environment_id][:num_workers]
>
> ```
> Is this supposed to be an info level logging?
>
> Thanks!
>
> Ning.
>