You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/19 18:33:57 UTC

[beam] branch master updated: Rename ProcessBundleProgressMetadataRequest to MonitoringInfosRequest. And rename ProcessBundleProgressMetadataResponse to MonitoringInfosResponse

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6581a86  Rename ProcessBundleProgressMetadataRequest to MonitoringInfosRequest. And rename ProcessBundleProgressMetadataResponse to MonitoringInfosResponse
     new e1df82b  Merge pull request #13078 from ajamato/rename_protos
6581a86 is described below

commit 6581a86892d6f7e320a42b01422cd18a7c801100
Author: Alex Amato <aj...@google.com>
AuthorDate: Mon Oct 12 15:47:28 2020 -0700

    Rename ProcessBundleProgressMetadataRequest to MonitoringInfosRequest. And rename ProcessBundleProgressMetadataResponse to MonitoringInfosResponse
---
 .../fn-execution/src/main/proto/beam_fn_api.proto  |  37 ++--
 .../apache_beam/runners/worker/sdk_worker.py       | 188 +++++++++++----------
 2 files changed, 118 insertions(+), 107 deletions(-)

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 439b534..0211595 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -108,7 +108,7 @@ message InstructionRequest {
     ProcessBundleProgressRequest process_bundle_progress = 1002;
     ProcessBundleSplitRequest process_bundle_split = 1003;
     FinalizeBundleRequest finalize_bundle = 1004;
-    ProcessBundleProgressMetadataRequest process_bundle_progress_metadata = 1005;
+    MonitoringInfosMetadataRequest monitoring_infos = 1005;
 
     // DEPRECATED
     RegisterRequest register = 1000;
@@ -135,7 +135,7 @@ message InstructionResponse {
     ProcessBundleProgressResponse process_bundle_progress = 1002;
     ProcessBundleSplitResponse process_bundle_split = 1003;
     FinalizeBundleResponse finalize_bundle = 1004;
-    ProcessBundleProgressMetadataResponse process_bundle_progress_metadata = 1005;
+    MonitoringInfosMetadataResponse monitoring_infos = 1005;
 
     // DEPRECATED
     RegisterResponse register = 1000;
@@ -288,7 +288,7 @@ message ProcessBundleResponse {
   // An SDK can report metrics using an identifier that only contains the
   // associated payload. A runner who wants to receive the full metrics
   // information can request all the monitoring metadata via a
-  // ProcessBundleProgressMetadataRequest providing a list of ids as necessary.
+  // MonitoringInfosMetadataRequest providing a list of ids as necessary.
   //
   // The SDK is allowed to reuse the identifiers across multiple bundles as long
   // as the MonitoringInfo could be reconstructed fully by overwriting its
@@ -307,18 +307,18 @@ message ProcessBundleProgressRequest {
   string instruction_id = 1;
 }
 
-// A request to provide full MonitoringInfo for a given bundle.
+// A request to provide full MonitoringInfo for a given id.
 //
 // An SDK can report metrics using an identifier that only contains the
 // associated payload. A runner who wants to receive the full metrics
 // information can request all the monitoring metadata via a
-// ProcessBundleProgressMetadataRequest providing a list of ids as necessary.
+// MonitoringInfosMetadataRequest providing a list of ids as necessary.
 //
-// The SDK is allowed to reuse the identifiers across multiple bundles as long
-// as the MonitoringInfo could be reconstructed fully by overwriting its
-// payload field with the bytes specified here.
-message ProcessBundleProgressMetadataRequest {
-  // A list of ids for which the full MonitoringInfo is requested for.
+// The MonitoringInfo ids are scoped to the associated control connection. For
+// example, an SDK may reuse the ids across multiple bundles.
+message MonitoringInfosMetadataRequest {
+  // A list of ids for which MonitoringInfo are requested. All but the payload
+  // field will be populated.
   repeated string monitoring_info_id = 1;
 }
 
@@ -332,7 +332,7 @@ message ProcessBundleProgressResponse {
   // An SDK can report metrics using an identifier that only contains the
   // associated payload. A runner who wants to receive the full metrics
   // information can request all the monitoring metadata via a
-  // ProcessBundleProgressMetadataRequest providing a list of ids as necessary.
+  // MonitoringInfosMetadataRequest providing a list of ids as necessary.
   //
   // The SDK is allowed to reuse the identifiers across multiple bundles as long
   // as the MonitoringInfo could be reconstructed fully by overwriting its
@@ -348,13 +348,13 @@ message ProcessBundleProgressResponse {
 // An SDK can report metrics using an identifier that only contains the
 // associated payload. A runner who wants to receive the full metrics
 // information can request all the monitoring metadata via a
-// ProcessBundleProgressMetadataRequest providing a list of ids as necessary.
+// MonitoringInfosMetadataRequest providing a list of ids as necessary.
 //
-// The SDK is allowed to reuse the identifiers across multiple bundles as long
-// as the MonitoringInfo could be reconstructed fully by overwriting its
-// payload field with the bytes specified here.
-message ProcessBundleProgressMetadataResponse {
-  // A mapping from an identifier to the full metrics information.
+// The MonitoringInfo ids are scoped to the associated control connection. For
+// example an SDK may reuse the ids across multiple bundles.
+message MonitoringInfosMetadataResponse {
+  // A mapping from a requested identifier to a MonitoringInfo. All fields
+  // except for the payload of the MonitoringInfo will be specified.
   map<string, org.apache.beam.model.pipeline.v1.MonitoringInfo> monitoring_info = 1;
 }
 
@@ -375,9 +375,6 @@ message ProcessBundleSplitRequest {
 
     // A set of allowed element indices where the SDK may split. When this is
     // empty, there are no constraints on where to split.
-    // Specifically, the first_residual_element of a split result must be an
-    // allowed split point, and the last_primary_element must immediately
-    // preceded an allowed split point.
     repeated int64 allowed_split_points = 3;
 
     // (Required for GrpcRead operations) Number of total elements expected
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index c03e431..b3c095c 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -160,19 +160,20 @@ SHORT_ID_CACHE = ShortIdCache()
 class SdkHarness(object):
   REQUEST_METHOD_PREFIX = '_request_'
 
-  def __init__(self,
-               control_address,  # type: str
-               credentials=None,  # type: Optional[grpc.ChannelCredentials]
-               worker_id=None,  # type: Optional[str]
-               # Caching is disabled by default
-               state_cache_size=0,  # type: int
-               # time-based data buffering is disabled by default
-               data_buffer_time_limit_ms=0,  # type: int
-               profiler_factory=None,  # type: Optional[Callable[..., Profile]]
-               status_address=None,  # type: Optional[str]
-               # Heap dump through status api is disabled by default
-               enable_heap_dump=False, # type: bool
-               ):
+  def __init__(
+      self,
+      control_address,  # type: str
+      credentials=None,  # type: Optional[grpc.ChannelCredentials]
+      worker_id=None,  # type: Optional[str]
+      # Caching is disabled by default
+      state_cache_size=0,  # type: int
+      # time-based data buffering is disabled by default
+      data_buffer_time_limit_ms=0,  # type: int
+      profiler_factory=None,  # type: Optional[Callable[..., Profile]]
+      status_address=None,  # type: Optional[str]
+      # Heap dump through status api is disabled by default
+      enable_heap_dump=False,  # type: bool
+  ):
     # type: (...) -> None
     self._alive = True
     self._worker_index = 0
@@ -276,10 +277,11 @@ class SdkHarness(object):
       self._status_handler.close()
     _LOGGER.info('Done consuming work.')
 
-  def _execute(self,
-               task,  # type: Callable[[], beam_fn_api_pb2.InstructionResponse]
-               request  # type:  beam_fn_api_pb2.InstructionRequest
-              ):
+  def _execute(
+      self,
+      task,  # type: Callable[[], beam_fn_api_pb2.InstructionResponse]
+      request  # type:  beam_fn_api_pb2.InstructionRequest
+  ):
     # type: (...) -> None
     with statesampler.instruction_id(request.instruction_id):
       try:
@@ -367,11 +369,12 @@ class BundleProcessorCache(object):
   """
   periodic_shutdown = None  # type: Optional[PeriodicThread]
 
-  def __init__(self,
-               state_handler_factory,  # type: StateHandlerFactory
-               data_channel_factory,  # type: data_plane.DataChannelFactory
-               fns  # type: MutableMapping[str, beam_fn_api_pb2.ProcessBundleDescriptor]
-              ):
+  def __init__(
+      self,
+      state_handler_factory,  # type: StateHandlerFactory
+      data_channel_factory,  # type: data_plane.DataChannelFactory
+      fns  # type: MutableMapping[str, beam_fn_api_pb2.ProcessBundleDescriptor]
+  ):
     # type: (...) -> None
     self.fns = fns
     self.state_handler_factory = state_handler_factory
@@ -554,13 +557,13 @@ class BundleProcessorCache(object):
 
 
 class SdkWorker(object):
-
-  def __init__(self,
-               bundle_processor_cache,  # type: BundleProcessorCache
-               state_cache_metrics_fn=list,  # type: Callable[[], Iterable[metrics_pb2.MonitoringInfo]]
-               profiler_factory=None,  # type: Optional[Callable[..., Profile]]
-               log_lull_timeout_ns=None,  # type: Optional[int]
-              ):
+  def __init__(
+      self,
+      bundle_processor_cache,  # type: BundleProcessorCache
+      state_cache_metrics_fn=list,  # type: Callable[[], Iterable[metrics_pb2.MonitoringInfo]]
+      profiler_factory=None,  # type: Optional[Callable[..., Profile]]
+      log_lull_timeout_ns=None,  # type: Optional[int]
+  ):
     # type: (...) -> None
     self.bundle_processor_cache = bundle_processor_cache
     self.state_cache_metrics_fn = state_cache_metrics_fn
@@ -579,10 +582,11 @@ class SdkWorker(object):
     else:
       raise NotImplementedError
 
-  def register(self,
-               request,  # type: beam_fn_api_pb2.RegisterRequest
-               instruction_id  # type: str
-              ):
+  def register(
+      self,
+      request,  # type: beam_fn_api_pb2.RegisterRequest
+      instruction_id  # type: str
+  ):
     # type: (...) -> beam_fn_api_pb2.InstructionResponse
 
     """Registers a set of ``beam_fn_api_pb2.ProcessBundleDescriptor``s.
@@ -598,10 +602,11 @@ class SdkWorker(object):
         instruction_id=instruction_id,
         register=beam_fn_api_pb2.RegisterResponse())
 
-  def process_bundle(self,
-                     request,  # type: beam_fn_api_pb2.ProcessBundleRequest
-                     instruction_id  # type: str
-                    ):
+  def process_bundle(
+      self,
+      request,  # type: beam_fn_api_pb2.ProcessBundleRequest
+      instruction_id  # type: str
+  ):
     # type: (...) -> beam_fn_api_pb2.InstructionResponse
     bundle_processor = self.bundle_processor_cache.get(
         instruction_id, request.process_bundle_descriptor_id)
@@ -632,10 +637,11 @@ class SdkWorker(object):
       self.bundle_processor_cache.discard(instruction_id)
       raise
 
-  def process_bundle_split(self,
-                           request,  # type: beam_fn_api_pb2.ProcessBundleSplitRequest
-                           instruction_id  # type: str
-                          ):
+  def process_bundle_split(
+      self,
+      request,  # type: beam_fn_api_pb2.ProcessBundleSplitRequest
+      instruction_id  # type: str
+  ):
     # type: (...) -> beam_fn_api_pb2.InstructionResponse
     try:
       processor = self.bundle_processor_cache.lookup(request.instruction_id)
@@ -700,10 +706,11 @@ class SdkWorker(object):
     # type: () -> None
     thread_dump()
 
-  def process_bundle_progress(self,
-                              request,  # type: beam_fn_api_pb2.ProcessBundleProgressRequest
-                              instruction_id  # type: str
-                             ):
+  def process_bundle_progress(
+      self,
+      request,  # type: beam_fn_api_pb2.ProcessBundleProgressRequest
+      instruction_id  # type: str
+  ):
     # type: (...) -> beam_fn_api_pb2.InstructionResponse
     try:
       processor = self.bundle_processor_cache.lookup(request.instruction_id)
@@ -726,22 +733,23 @@ class SdkWorker(object):
                 for info in monitoring_infos
             }))
 
-  def process_bundle_progress_metadata_request(self,
-                                               request,  # type: beam_fn_api_pb2.ProcessBundleProgressMetadataRequest
-                                               instruction_id  # type: str
-                                              ):
+  def monitoring_infos_request(
+      self,
+      request,  # type: beam_fn_api_pb2.MonitoringInfosMetadataRequest
+      instruction_id  # type: str
+  ):
     # type: (...) -> beam_fn_api_pb2.InstructionResponse
     return beam_fn_api_pb2.InstructionResponse(
         instruction_id=instruction_id,
-        process_bundle_progress_metadata=beam_fn_api_pb2.
-        ProcessBundleProgressMetadataResponse(
+        monitoring_infos=beam_fn_api_pb2.MonitoringInfosMetadataResponse(
             monitoring_info=SHORT_ID_CACHE.getInfos(
                 request.monitoring_info_id)))
 
-  def finalize_bundle(self,
-                      request,  # type: beam_fn_api_pb2.FinalizeBundleRequest
-                      instruction_id  # type: str
-                     ):
+  def finalize_bundle(
+      self,
+      request,  # type: beam_fn_api_pb2.FinalizeBundleRequest
+      instruction_id  # type: str
+  ):
     # type: (...) -> beam_fn_api_pb2.InstructionResponse
     try:
       processor = self.bundle_processor_cache.lookup(request.instruction_id)
@@ -780,10 +788,11 @@ class SdkWorker(object):
 class StateHandler(with_metaclass(abc.ABCMeta, object)):  # type: ignore[misc]
   """An abstract object representing a ``StateHandler``."""
   @abc.abstractmethod
-  def get_raw(self,
-              state_key,  # type: beam_fn_api_pb2.StateKey
-              continuation_token=None  # type: Optional[bytes]
-             ):
+  def get_raw(
+      self,
+      state_key,  # type: beam_fn_api_pb2.StateKey
+      continuation_token=None  # type: Optional[bytes]
+  ):
     # type: (...) -> Tuple[bytes, Optional[bytes]]
     raise NotImplementedError(type(self))
 
@@ -874,10 +883,11 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
 
 class ThrowingStateHandler(StateHandler):
   """A state handler that errors on any requests."""
-  def get_raw(self,
-              state_key,  # type: beam_fn_api_pb2.StateKey
-              continuation_token=None  # type: Optional[bytes]
-             ):
+  def get_raw(
+      self,
+      state_key,  # type: beam_fn_api_pb2.StateKey
+      continuation_token=None  # type: Optional[bytes]
+  ):
     # type: (...) -> Tuple[bytes, Optional[bytes]]
     raise RuntimeError(
         'Unable to handle state requests for ProcessBundleDescriptor without '
@@ -964,10 +974,11 @@ class GrpcStateHandler(StateHandler):
     self._done = True
     self._requests.put(self._DONE)
 
-  def get_raw(self,
-              state_key,  # type: beam_fn_api_pb2.StateKey
-              continuation_token=None  # type: Optional[bytes]
-             ):
+  def get_raw(
+      self,
+      state_key,  # type: beam_fn_api_pb2.StateKey
+      continuation_token=None  # type: Optional[bytes]
+  ):
     # type: (...) -> Tuple[bytes, Optional[bytes]]
     response = self._blocking_request(
         beam_fn_api_pb2.StateRequest(
@@ -976,10 +987,11 @@ class GrpcStateHandler(StateHandler):
                 continuation_token=continuation_token)))
     return response.get.data, response.get.continuation_token
 
-  def append_raw(self,
-                 state_key,  # type: Optional[beam_fn_api_pb2.StateKey]
-                 data  # type: bytes
-                ):
+  def append_raw(
+      self,
+      state_key,  # type: Optional[beam_fn_api_pb2.StateKey]
+      data  # type: bytes
+  ):
     # type: (...) -> _Future
     return self._request(
         beam_fn_api_pb2.StateRequest(
@@ -1036,11 +1048,11 @@ class CachingStateHandler(object):
    If activated but no cache token is supplied, caching is done at the bundle
    level.
   """
-
-  def __init__(self,
-               global_state_cache,  # type: StateCache
-               underlying_state  # type: StateHandler
-              ):
+  def __init__(
+      self,
+      global_state_cache,  # type: StateCache
+      underlying_state  # type: StateHandler
+  ):
     # type: (...) -> None
     self._underlying = underlying_state
     self._state_cache = global_state_cache
@@ -1078,10 +1090,11 @@ class CachingStateHandler(object):
       self._context.user_state_cache_token = None
       self._context.bundle_cache_token = None
 
-  def blocking_get(self,
-                   state_key,  # type: beam_fn_api_pb2.StateKey
-                   coder,  # type: coder_impl.CoderImpl
-                  ):
+  def blocking_get(
+      self,
+      state_key,  # type: beam_fn_api_pb2.StateKey
+      coder,  # type: coder_impl.CoderImpl
+  ):
     # type: (...) -> Iterable[Any]
     cache_token = self._get_cache_token(state_key)
     if not cache_token:
@@ -1107,11 +1120,12 @@ class CachingStateHandler(object):
             state_key)
     return cached_value
 
-  def extend(self,
-             state_key,  # type: beam_fn_api_pb2.StateKey
-             coder,  # type: coder_impl.CoderImpl
-             elements,  # type: Iterable[Any]
-            ):
+  def extend(
+      self,
+      state_key,  # type: beam_fn_api_pb2.StateKey
+      coder,  # type: coder_impl.CoderImpl
+      elements,  # type: Iterable[Any]
+  ):
     # type: (...) -> _Future
     cache_token = self._get_cache_token(state_key)
     if cache_token:
@@ -1161,7 +1175,7 @@ class CachingStateHandler(object):
       state_key,  # type: beam_fn_api_pb2.StateKey
       coder,  # type: coder_impl.CoderImpl
       continuation_token=None  # type: Optional[bytes]
-    ):
+  ):
     # type: (...) -> Iterator[Any]
 
     """Materializes the state lazily, one element at a time.
@@ -1196,7 +1210,7 @@ class CachingStateHandler(object):
       self,
       state_key,  # type: beam_fn_api_pb2.StateKey
       coder  # type: coder_impl.CoderImpl
-    ):
+  ):
     # type: (...) -> Iterable[Any]
 
     """Materialized the first page of data, concatenated with a lazy iterable