You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Chad Dombrova <ch...@gmail.com> on 2019/11/06 18:30:34 UTC

Questions about the current and future design of the job service message stream

Hi all,
I’ve been working lately on improving the state stream and message stream
on the job service (links to issues and PRs below), and I’m somewhat
confused by the inclusion of states in the message stream, since there’s a
separate dedicated state stream for that. Here’s the proto for the message
response:

message GetJobStateResponse {  JobState.Enum state = 1; // (required)
}
message JobMessage {
  string message_id = 1;
  string time = 2;  MessageImportance importance = 3;
  string message_text = 4;

  enum MessageImportance {    MESSAGE_IMPORTANCE_UNSPECIFIED = 0;
JOB_MESSAGE_DEBUG = 1;    JOB_MESSAGE_DETAILED = 2;
JOB_MESSAGE_BASIC = 3;    JOB_MESSAGE_WARNING = 4;
JOB_MESSAGE_ERROR = 5;
  }
}
message JobMessagesResponse {
  oneof response {    JobMessage message_response = 1;
GetJobStateResponse state_response = 2;
  }
}

You can see that each JobMessagesResponse may contain a message *or* a
GetJobStateResponse.

What’s the intention behind this design?

The main benefit I see is that it’s easier to ensure that the state and
message logs are properly ordered. For example, in the code below it’s
unclear at a glance (at least to me) whether we’d need to use locking
between the main thread and read_messages thread if the main thread were
made solely responsible for logging state messages:

  def wait_until_finish(self):

    def read_messages():
      for message in self._message_stream:
        if message.HasField('message_response'):
          logging.log(
              MESSAGE_LOG_LEVELS[message.message_response.importance],
              "%s",
              message.message_response.message_text)
        else:
          logging.info(
              "Job state changed to %s",
              self._runner_api_state_to_pipeline_state(
                  message.state_response.state))
        self._messages.append(message)

    t = threading.Thread(target=read_messages, name='wait_until_finish_read')
    t.daemon = True
    t.start()

    try:
      for state_response in self._state_stream:
        self._state = self._runner_api_state_to_pipeline_state(
            state_response.state)
        if state_response.state in TERMINAL_STATES:
          # Wait for any last messages.
          t.join(10)
          break
      if self._state != runner.PipelineState.DONE:
        raise RuntimeError(
            'Pipeline %s failed in state %s: %s' % (
                self._job_id, self._state, self._last_error_message()))
      return self._state
    finally:
      self._cleanup()

The reason this is important to me is I’d like to make a handful of changes
to GetMessageStream to make it more powerful:

   - propagate messages from user code (if they opt in to setting up their
   logger appropriately). currently, AFAICT, the only message the message
   stream delivers is a final error, if the job fails (other than state
   changes). It was clearly the original intent of this endpoint to carry
   other types of messages, and I'd like to bring that to fruition.
   - make it possible to backfill log messages when a client connection is
   made (limited by a min timestamp and/or max number of messages).  so if a
   client connects late it can still easily catch up with a limited amount of
   recent activity.
   - make it possible to back GetMessageStream with logging services like
   StackDriver, CloudWatch, or Elasticsearch

Mixing state changes and log messages in the same stream adds some wrinkles
to this plan, especially for the last one.  The reason is that log messages
will come primarily from user code, whereas state changes come from the
runner, and it might require some unwanted abstractions throughout the
various runners to enable them to deliver state changes to this external
service, whereas delivering user logs is very straightforward - just setup
your logging handler.

I’d love to know others' thoughts on what they’d like to see out of the
future of logging in Beam.

*Current Progress:*

Sorting out proper state transitions:

   - https://issues.apache.org/jira/browse/BEAM-8552
   - https://issues.apache.org/jira/browse/BEAM-8539
   - https://github.com/apache/beam/pull/9965
   <https://github.com/apache/beam/pull/9965>

Adding timestamps and history to the state stream:

   - https://issues.apache.org/jira/browse/BEAM-8523
   - https://github.com/apache/beam/pull/9959

-chad

Re: Questions about the current and future design of the job service message stream

Posted by Luke Cwik <lc...@google.com>.
On Sun, Nov 10, 2019 at 5:06 PM Chad Dombrova <ch...@gmail.com> wrote:

> Hi,
>
>> You can see that each JobMessagesResponse may contain a message *or* a
>>> GetJobStateResponse.
>>>
>>> What’s the intention behind this design?
>>>
>> I believe this was because a user may want to listen to both job state
>> and messages all in one stream.
>>
>
> Just to be crystal clear, what's the advantage of using a single stream
> versus two?
>

gRPC guarantees that the messages are delivered in order and its purely
convenience since you would likely want to know about both job state
changes and any important messages from the runner. I'm sure that this
could be changed though.

> The reason this is important to me is I’d like to make a handful of
>>> changes to GetMessageStream to make it more powerful:
>>>
>>>    - propagate messages from user code (if they opt in to setting up
>>>    their logger appropriately). currently, AFAICT, the only message the
>>>    message stream delivers is a final error, if the job fails (other than
>>>    state changes). It was clearly the original intent of this endpoint to
>>>    carry other types of messages, and I'd like to bring that to fruition.
>>>
>>> Log messages is a lot of data, we do have users writing GBs/s when
>> aggregated across all their machines in Google Cloud so not sure if this
>> will scale without a lot of control on filtering. Users sometimes don't
>> recognize how much they are logging and if you have a 1000 VMs each writing
>> only a few lines at a time you can easily saturate this stream.
>>
>
> Yes, we're concerned about message volume as well.  The plan would be to
> add filters, which could be propagated from the job server to the logger on
> the runner and sdk (if they support it) to avoid over-saturating the
> stream.  For example, the log-level right now is basically ERROR, so we'd
> propagate that to the runner and it would only send error messages back to
> the job server.  Thus, we should hopefully be able to roll out this feature
> without much change to the end user.  They could then opt-in to higher
> volume message levels, if desired.
>
> Some possible filters could be:
>
>    - job id (required)
>    - log level (default=ERROR)
>    - transform id(s) (optional. defaults to just runner messages)
>    - a jsonpath <https://github.com/json-path/JsonPath> selector for
>    filtering on message metadata?
>
> I think a logging implementation would consist of 2 parts:  the logging
> service (i.e. an implementation of GetMessageStream) and the logging
> handler for emitting messages from the runner and optionally user
> transforms.  Handlers would need to be implemented for each SDK (i.e.
> language).
>
> The default logging implementation would consist of the InMemoryJobService
> on the servicer side, which would send the filter object to the handler.
>  The handler would pre-filter messages and stream them back to the standard
> job service, which would simply forward on everything it receives, as it
> does now.
>
> A StackDriver logging service would be a bit different.  Its logging
> handler might send *everything* to StackDriver so that there's a complete
> record that can be sifted through later.  Its servicer component would
> interpret the filter object into a StackDriver filter string and create a
> subscription with StackDriver.
>
> In this way we could support both semi-persistent logging services with a
> queryable history (like StackDriver) and completely transient message
> streams like we have now.
>

I see, only in the former case would you push the logging configuration to
the SDK.
I was interested in being able to have dynamic logging configuration for
the SDK so that users would be able to change log levels of their pipeline
as its running from a UI. A common use case is that you have some long
running streaming pipeline and that it started to fail on some piece of
data and you want to increase the log level to get more details. The
LogControl message was meant to enable such a feature[1].

>
>>>    - make it possible to back GetMessageStream with logging services
>>>    like StackDriver, CloudWatch, or Elasticsearch
>>>
>>> That is interesting, originally the message stream was designed around
>> system messages from the runner and not specifically around users log
>> messages due to volume concerns. All logging integration to my knowledge
>> has been deferred to the client libraries for those specific services.
>>
>
> What we're after is a user experience akin to what the Dataflow UI
> provides: view a pipeline, open the log console, and view recent messages
> from the runner.  click on a transform to view messages emitted by that
> transform.  We've found Flink's logging and log UI to be sorely lacking and
> we like the idea of tackling this problem at the Beam level, especially
> considering so much of what we want is already there in some form.
>
> Another use case that I think would benefit from this is providing custom
> progress messages to users who launch a batch job from a shell, since the
> message stream is already emitted there.   Of course, you'd have to be
> careful about message volume, but as I mentioned there would be 2 levels
> where you'd need to opt in:
>
>    - changing log level from its default (ERROR)
>    - setting up transform-level logging
>
>
> -chad
>
>
1:
https://github.com/apache/beam/blob/0178652b6c5b2e34f8b1a562e2d398eebe63587a/model/fn-execution/src/main/proto/beam_fn_api.proto#L791

Re: Questions about the current and future design of the job service message stream

Posted by Chad Dombrova <ch...@gmail.com>.
Hi,

> You can see that each JobMessagesResponse may contain a message *or* a
>> GetJobStateResponse.
>>
>> What’s the intention behind this design?
>>
> I believe this was because a user may want to listen to both job state and
> messages all in one stream.
>

Just to be crystal clear, what's the advantage of using a single stream
versus two?

> The reason this is important to me is I’d like to make a handful of
>> changes to GetMessageStream to make it more powerful:
>>
>>    - propagate messages from user code (if they opt in to setting up
>>    their logger appropriately). currently, AFAICT, the only message the
>>    message stream delivers is a final error, if the job fails (other than
>>    state changes). It was clearly the original intent of this endpoint to
>>    carry other types of messages, and I'd like to bring that to fruition.
>>
>> Log messages is a lot of data, we do have users writing GBs/s when
> aggregated across all their machines in Google Cloud so not sure if this
> will scale without a lot of control on filtering. Users sometimes don't
> recognize how much they are logging and if you have a 1000 VMs each writing
> only a few lines at a time you can easily saturate this stream.
>

Yes, we're concerned about message volume as well.  The plan would be to
add filters, which could be propagated from the job server to the logger on
the runner and sdk (if they support it) to avoid over-saturating the
stream.  For example, the log-level right now is basically ERROR, so we'd
propagate that to the runner and it would only send error messages back to
the job server.  Thus, we should hopefully be able to roll out this feature
without much change to the end user.  They could then opt-in to higher
volume message levels, if desired.

Some possible filters could be:

   - job id (required)
   - log level (default=ERROR)
   - transform id(s) (optional. defaults to just runner messages)
   - a jsonpath <https://github.com/json-path/JsonPath> selector for
   filtering on message metadata?

I think a logging implementation would consist of 2 parts:  the logging
service (i.e. an implementation of GetMessageStream) and the logging
handler for emitting messages from the runner and optionally user
transforms.  Handlers would need to be implemented for each SDK (i.e.
language).

The default logging implementation would consist of the InMemoryJobService
on the servicer side, which would send the filter object to the handler.
 The handler would pre-filter messages and stream them back to the standard
job service, which would simply forward on everything it receives, as it
does now.

A StackDriver logging service would be a bit different.  Its logging
handler might send *everything* to StackDriver so that there's a complete
record that can be sifted through later.  Its servicer component would
interpret the filter object into a StackDriver filter string and create a
subscription with StackDriver.

In this way we could support both semi-persistent logging services with a
queryable history (like StackDriver) and completely transient message
streams like we have now.

>
>>    - make it possible to back GetMessageStream with logging services
>>    like StackDriver, CloudWatch, or Elasticsearch
>>
>> That is interesting, originally the message stream was designed around
> system messages from the runner and not specifically around users log
> messages due to volume concerns. All logging integration to my knowledge
> has been deferred to the client libraries for those specific services.
>

What we're after is a user experience akin to what the Dataflow UI
provides: view a pipeline, open the log console, and view recent messages
from the runner.  click on a transform to view messages emitted by that
transform.  We've found Flink's logging and log UI to be sorely lacking and
we like the idea of tackling this problem at the Beam level, especially
considering so much of what we want is already there in some form.

Another use case that I think would benefit from this is providing custom
progress messages to users who launch a batch job from a shell, since the
message stream is already emitted there.   Of course, you'd have to be
careful about message volume, but as I mentioned there would be 2 levels
where you'd need to opt in:

   - changing log level from its default (ERROR)
   - setting up transform-level logging


-chad

Re: Questions about the current and future design of the job service message stream

Posted by Luke Cwik <lc...@google.com>.
+Daniel Mills <mi...@google.com> for usability in job messages / logging
integration across Beam runners.

On Wed, Nov 6, 2019 at 10:30 AM Chad Dombrova <ch...@gmail.com> wrote:

> Hi all,
> I’ve been working lately on improving the state stream and message stream
> on the job service (links to issues and PRs below), and I’m somewhat
> confused by the inclusion of states in the message stream, since there’s a
> separate dedicated state stream for that. Here’s the proto for the message
> response:
>
> message GetJobStateResponse {  JobState.Enum state = 1; // (required)
> }
> message JobMessage {
>   string message_id = 1;
>   string time = 2;  MessageImportance importance = 3;
>   string message_text = 4;
>
>   enum MessageImportance {    MESSAGE_IMPORTANCE_UNSPECIFIED = 0;    JOB_MESSAGE_DEBUG = 1;    JOB_MESSAGE_DETAILED = 2;    JOB_MESSAGE_BASIC = 3;    JOB_MESSAGE_WARNING = 4;    JOB_MESSAGE_ERROR = 5;
>   }
> }
> message JobMessagesResponse {
>   oneof response {    JobMessage message_response = 1;    GetJobStateResponse state_response = 2;
>   }
> }
>
> You can see that each JobMessagesResponse may contain a message *or* a
> GetJobStateResponse.
>
> What’s the intention behind this design?
>
I believe this was because a user may want to listen to both job state and
messages all in one stream.

> The main benefit I see is that it’s easier to ensure that the state and
> message logs are properly ordered. For example, in the code below it’s
> unclear at a glance (at least to me) whether we’d need to use locking
> between the main thread and read_messages thread if the main thread were
> made solely responsible for logging state messages:
>
>   def wait_until_finish(self):
>
>     def read_messages():
>       for message in self._message_stream:
>         if message.HasField('message_response'):
>           logging.log(
>               MESSAGE_LOG_LEVELS[message.message_response.importance],
>               "%s",
>               message.message_response.message_text)
>         else:
>           logging.info(
>               "Job state changed to %s",
>               self._runner_api_state_to_pipeline_state(
>                   message.state_response.state))
>         self._messages.append(message)
>
>     t = threading.Thread(target=read_messages, name='wait_until_finish_read')
>     t.daemon = True
>     t.start()
>
>     try:
>       for state_response in self._state_stream:
>         self._state = self._runner_api_state_to_pipeline_state(
>             state_response.state)
>         if state_response.state in TERMINAL_STATES:
>           # Wait for any last messages.
>           t.join(10)
>           break
>       if self._state != runner.PipelineState.DONE:
>         raise RuntimeError(
>             'Pipeline %s failed in state %s: %s' % (
>                 self._job_id, self._state, self._last_error_message()))
>       return self._state
>     finally:
>       self._cleanup()
>
> The reason this is important to me is I’d like to make a handful of
> changes to GetMessageStream to make it more powerful:
>
>    - propagate messages from user code (if they opt in to setting up
>    their logger appropriately). currently, AFAICT, the only message the
>    message stream delivers is a final error, if the job fails (other than
>    state changes). It was clearly the original intent of this endpoint to
>    carry other types of messages, and I'd like to bring that to fruition.
>
> Log messages is a lot of data, we do have users writing GBs/s when
aggregated across all their machines in Google Cloud so not sure if this
will scale without a lot of control on filtering. Users sometimes don't
recognize how much they are logging and if you have a 1000 VMs each writing
only a few lines at a time you can easily saturate this stream.

>
>    - make it possible to backfill log messages when a client connection
>    is made (limited by a min timestamp and/or max number of messages).  so if
>    a client connects late it can still easily catch up with a limited amount
>    of recent activity.
>
> +1

>
>    - make it possible to back GetMessageStream with logging services like
>    StackDriver, CloudWatch, or Elasticsearch
>
> That is interesting, originally the message stream was designed around
system messages from the runner and not specifically around users log
messages due to volume concerns. All logging integration to my knowledge
has been deferred to the client libraries for those specific services.

>
>
> Mixing state changes and log messages in the same stream adds some
> wrinkles to this plan, especially for the last one.  The reason is that log
> messages will come primarily from user code, whereas state changes come
> from the runner, and it might require some unwanted abstractions throughout
> the various runners to enable them to deliver state changes to this
> external service, whereas delivering user logs is very straightforward -
> just setup your logging handler.
>
> I’d love to know others' thoughts on what they’d like to see out of the
> future of logging in Beam.
>
> *Current Progress:*
>
> Sorting out proper state transitions:
>
>    - https://issues.apache.org/jira/browse/BEAM-8552
>    - https://issues.apache.org/jira/browse/BEAM-8539
>    - https://github.com/apache/beam/pull/9965
>    <https://github.com/apache/beam/pull/9965>
>
> Adding timestamps and history to the state stream:
>
>    - https://issues.apache.org/jira/browse/BEAM-8523
>    - https://github.com/apache/beam/pull/9959
>
> -chad
>
>