You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/19 18:33:57 UTC
[beam] branch master updated: Rename
ProcessBundleProgressMetadataRequest to MonitoringInfosRequest. And rename
ProcessBundleProgressMetadataResponse to MonitoringInfosResponse
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6581a86 Rename ProcessBundleProgressMetadataRequest to MonitoringInfosRequest. And rename ProcessBundleProgressMetadataResponse to MonitoringInfosResponse
new e1df82b Merge pull request #13078 from ajamato/rename_protos
6581a86 is described below
commit 6581a86892d6f7e320a42b01422cd18a7c801100
Author: Alex Amato <aj...@google.com>
AuthorDate: Mon Oct 12 15:47:28 2020 -0700
Rename ProcessBundleProgressMetadataRequest to MonitoringInfosRequest. And rename ProcessBundleProgressMetadataResponse to MonitoringInfosResponse
---
.../fn-execution/src/main/proto/beam_fn_api.proto | 37 ++--
.../apache_beam/runners/worker/sdk_worker.py | 188 +++++++++++----------
2 files changed, 118 insertions(+), 107 deletions(-)
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 439b534..0211595 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -108,7 +108,7 @@ message InstructionRequest {
ProcessBundleProgressRequest process_bundle_progress = 1002;
ProcessBundleSplitRequest process_bundle_split = 1003;
FinalizeBundleRequest finalize_bundle = 1004;
- ProcessBundleProgressMetadataRequest process_bundle_progress_metadata = 1005;
+ MonitoringInfosMetadataRequest monitoring_infos = 1005;
// DEPRECATED
RegisterRequest register = 1000;
@@ -135,7 +135,7 @@ message InstructionResponse {
ProcessBundleProgressResponse process_bundle_progress = 1002;
ProcessBundleSplitResponse process_bundle_split = 1003;
FinalizeBundleResponse finalize_bundle = 1004;
- ProcessBundleProgressMetadataResponse process_bundle_progress_metadata = 1005;
+ MonitoringInfosMetadataResponse monitoring_infos = 1005;
// DEPRECATED
RegisterResponse register = 1000;
@@ -288,7 +288,7 @@ message ProcessBundleResponse {
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
- // ProcessBundleProgressMetadataRequest providing a list of ids as necessary.
+ // MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
// The SDK is allowed to reuse the identifiers across multiple bundles as long
// as the MonitoringInfo could be reconstructed fully by overwriting its
@@ -307,18 +307,18 @@ message ProcessBundleProgressRequest {
string instruction_id = 1;
}
-// A request to provide full MonitoringInfo for a given bundle.
+// A request to provide full MonitoringInfo for a given id.
//
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
-// ProcessBundleProgressMetadataRequest providing a list of ids as necessary.
+// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
-// The SDK is allowed to reuse the identifiers across multiple bundles as long
-// as the MonitoringInfo could be reconstructed fully by overwriting its
-// payload field with the bytes specified here.
-message ProcessBundleProgressMetadataRequest {
- // A list of ids for which the full MonitoringInfo is requested for.
+// The MonitoringInfo ids are scoped to the associated control connection. For
+// example, an SDK may reuse the ids across multiple bundles.
+message MonitoringInfosMetadataRequest {
+ // A list of ids for which MonitoringInfo are requested. All but the payload
+ // field will be populated.
repeated string monitoring_info_id = 1;
}
@@ -332,7 +332,7 @@ message ProcessBundleProgressResponse {
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
- // ProcessBundleProgressMetadataRequest providing a list of ids as necessary.
+ // MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
// The SDK is allowed to reuse the identifiers across multiple bundles as long
// as the MonitoringInfo could be reconstructed fully by overwriting its
@@ -348,13 +348,13 @@ message ProcessBundleProgressResponse {
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
-// ProcessBundleProgressMetadataRequest providing a list of ids as necessary.
+// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
-// The SDK is allowed to reuse the identifiers across multiple bundles as long
-// as the MonitoringInfo could be reconstructed fully by overwriting its
-// payload field with the bytes specified here.
-message ProcessBundleProgressMetadataResponse {
- // A mapping from an identifier to the full metrics information.
+// The MonitoringInfo ids are scoped to the associated control connection. For
+// example an SDK may reuse the ids across multiple bundles.
+message MonitoringInfosMetadataResponse {
+ // A mapping from a requested identifier to a MonitoringInfo. All fields
+ // except for the payload of the MonitoringInfo will be specified.
map<string, org.apache.beam.model.pipeline.v1.MonitoringInfo> monitoring_info = 1;
}
@@ -375,9 +375,6 @@ message ProcessBundleSplitRequest {
// A set of allowed element indices where the SDK may split. When this is
// empty, there are no constraints on where to split.
- // Specifically, the first_residual_element of a split result must be an
- // allowed split point, and the last_primary_element must immediately
- // preceded an allowed split point.
repeated int64 allowed_split_points = 3;
// (Required for GrpcRead operations) Number of total elements expected
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index c03e431..b3c095c 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -160,19 +160,20 @@ SHORT_ID_CACHE = ShortIdCache()
class SdkHarness(object):
REQUEST_METHOD_PREFIX = '_request_'
- def __init__(self,
- control_address, # type: str
- credentials=None, # type: Optional[grpc.ChannelCredentials]
- worker_id=None, # type: Optional[str]
- # Caching is disabled by default
- state_cache_size=0, # type: int
- # time-based data buffering is disabled by default
- data_buffer_time_limit_ms=0, # type: int
- profiler_factory=None, # type: Optional[Callable[..., Profile]]
- status_address=None, # type: Optional[str]
- # Heap dump through status api is disabled by default
- enable_heap_dump=False, # type: bool
- ):
+ def __init__(
+ self,
+ control_address, # type: str
+ credentials=None, # type: Optional[grpc.ChannelCredentials]
+ worker_id=None, # type: Optional[str]
+ # Caching is disabled by default
+ state_cache_size=0, # type: int
+ # time-based data buffering is disabled by default
+ data_buffer_time_limit_ms=0, # type: int
+ profiler_factory=None, # type: Optional[Callable[..., Profile]]
+ status_address=None, # type: Optional[str]
+ # Heap dump through status api is disabled by default
+ enable_heap_dump=False, # type: bool
+ ):
# type: (...) -> None
self._alive = True
self._worker_index = 0
@@ -276,10 +277,11 @@ class SdkHarness(object):
self._status_handler.close()
_LOGGER.info('Done consuming work.')
- def _execute(self,
- task, # type: Callable[[], beam_fn_api_pb2.InstructionResponse]
- request # type: beam_fn_api_pb2.InstructionRequest
- ):
+ def _execute(
+ self,
+ task, # type: Callable[[], beam_fn_api_pb2.InstructionResponse]
+ request # type: beam_fn_api_pb2.InstructionRequest
+ ):
# type: (...) -> None
with statesampler.instruction_id(request.instruction_id):
try:
@@ -367,11 +369,12 @@ class BundleProcessorCache(object):
"""
periodic_shutdown = None # type: Optional[PeriodicThread]
- def __init__(self,
- state_handler_factory, # type: StateHandlerFactory
- data_channel_factory, # type: data_plane.DataChannelFactory
- fns # type: MutableMapping[str, beam_fn_api_pb2.ProcessBundleDescriptor]
- ):
+ def __init__(
+ self,
+ state_handler_factory, # type: StateHandlerFactory
+ data_channel_factory, # type: data_plane.DataChannelFactory
+ fns # type: MutableMapping[str, beam_fn_api_pb2.ProcessBundleDescriptor]
+ ):
# type: (...) -> None
self.fns = fns
self.state_handler_factory = state_handler_factory
@@ -554,13 +557,13 @@ class BundleProcessorCache(object):
class SdkWorker(object):
-
- def __init__(self,
- bundle_processor_cache, # type: BundleProcessorCache
- state_cache_metrics_fn=list, # type: Callable[[], Iterable[metrics_pb2.MonitoringInfo]]
- profiler_factory=None, # type: Optional[Callable[..., Profile]]
- log_lull_timeout_ns=None, # type: Optional[int]
- ):
+ def __init__(
+ self,
+ bundle_processor_cache, # type: BundleProcessorCache
+ state_cache_metrics_fn=list, # type: Callable[[], Iterable[metrics_pb2.MonitoringInfo]]
+ profiler_factory=None, # type: Optional[Callable[..., Profile]]
+ log_lull_timeout_ns=None, # type: Optional[int]
+ ):
# type: (...) -> None
self.bundle_processor_cache = bundle_processor_cache
self.state_cache_metrics_fn = state_cache_metrics_fn
@@ -579,10 +582,11 @@ class SdkWorker(object):
else:
raise NotImplementedError
- def register(self,
- request, # type: beam_fn_api_pb2.RegisterRequest
- instruction_id # type: str
- ):
+ def register(
+ self,
+ request, # type: beam_fn_api_pb2.RegisterRequest
+ instruction_id # type: str
+ ):
# type: (...) -> beam_fn_api_pb2.InstructionResponse
"""Registers a set of ``beam_fn_api_pb2.ProcessBundleDescriptor``s.
@@ -598,10 +602,11 @@ class SdkWorker(object):
instruction_id=instruction_id,
register=beam_fn_api_pb2.RegisterResponse())
- def process_bundle(self,
- request, # type: beam_fn_api_pb2.ProcessBundleRequest
- instruction_id # type: str
- ):
+ def process_bundle(
+ self,
+ request, # type: beam_fn_api_pb2.ProcessBundleRequest
+ instruction_id # type: str
+ ):
# type: (...) -> beam_fn_api_pb2.InstructionResponse
bundle_processor = self.bundle_processor_cache.get(
instruction_id, request.process_bundle_descriptor_id)
@@ -632,10 +637,11 @@ class SdkWorker(object):
self.bundle_processor_cache.discard(instruction_id)
raise
- def process_bundle_split(self,
- request, # type: beam_fn_api_pb2.ProcessBundleSplitRequest
- instruction_id # type: str
- ):
+ def process_bundle_split(
+ self,
+ request, # type: beam_fn_api_pb2.ProcessBundleSplitRequest
+ instruction_id # type: str
+ ):
# type: (...) -> beam_fn_api_pb2.InstructionResponse
try:
processor = self.bundle_processor_cache.lookup(request.instruction_id)
@@ -700,10 +706,11 @@ class SdkWorker(object):
# type: () -> None
thread_dump()
- def process_bundle_progress(self,
- request, # type: beam_fn_api_pb2.ProcessBundleProgressRequest
- instruction_id # type: str
- ):
+ def process_bundle_progress(
+ self,
+ request, # type: beam_fn_api_pb2.ProcessBundleProgressRequest
+ instruction_id # type: str
+ ):
# type: (...) -> beam_fn_api_pb2.InstructionResponse
try:
processor = self.bundle_processor_cache.lookup(request.instruction_id)
@@ -726,22 +733,23 @@ class SdkWorker(object):
for info in monitoring_infos
}))
- def process_bundle_progress_metadata_request(self,
- request, # type: beam_fn_api_pb2.ProcessBundleProgressMetadataRequest
- instruction_id # type: str
- ):
+ def monitoring_infos_request(
+ self,
+ request, # type: beam_fn_api_pb2.MonitoringInfosMetadataRequest
+ instruction_id # type: str
+ ):
# type: (...) -> beam_fn_api_pb2.InstructionResponse
return beam_fn_api_pb2.InstructionResponse(
instruction_id=instruction_id,
- process_bundle_progress_metadata=beam_fn_api_pb2.
- ProcessBundleProgressMetadataResponse(
+ monitoring_infos=beam_fn_api_pb2.MonitoringInfosMetadataResponse(
monitoring_info=SHORT_ID_CACHE.getInfos(
request.monitoring_info_id)))
- def finalize_bundle(self,
- request, # type: beam_fn_api_pb2.FinalizeBundleRequest
- instruction_id # type: str
- ):
+ def finalize_bundle(
+ self,
+ request, # type: beam_fn_api_pb2.FinalizeBundleRequest
+ instruction_id # type: str
+ ):
# type: (...) -> beam_fn_api_pb2.InstructionResponse
try:
processor = self.bundle_processor_cache.lookup(request.instruction_id)
@@ -780,10 +788,11 @@ class SdkWorker(object):
class StateHandler(with_metaclass(abc.ABCMeta, object)): # type: ignore[misc]
"""An abstract object representing a ``StateHandler``."""
@abc.abstractmethod
- def get_raw(self,
- state_key, # type: beam_fn_api_pb2.StateKey
- continuation_token=None # type: Optional[bytes]
- ):
+ def get_raw(
+ self,
+ state_key, # type: beam_fn_api_pb2.StateKey
+ continuation_token=None # type: Optional[bytes]
+ ):
# type: (...) -> Tuple[bytes, Optional[bytes]]
raise NotImplementedError(type(self))
@@ -874,10 +883,11 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
class ThrowingStateHandler(StateHandler):
"""A state handler that errors on any requests."""
- def get_raw(self,
- state_key, # type: beam_fn_api_pb2.StateKey
- continuation_token=None # type: Optional[bytes]
- ):
+ def get_raw(
+ self,
+ state_key, # type: beam_fn_api_pb2.StateKey
+ continuation_token=None # type: Optional[bytes]
+ ):
# type: (...) -> Tuple[bytes, Optional[bytes]]
raise RuntimeError(
'Unable to handle state requests for ProcessBundleDescriptor without '
@@ -964,10 +974,11 @@ class GrpcStateHandler(StateHandler):
self._done = True
self._requests.put(self._DONE)
- def get_raw(self,
- state_key, # type: beam_fn_api_pb2.StateKey
- continuation_token=None # type: Optional[bytes]
- ):
+ def get_raw(
+ self,
+ state_key, # type: beam_fn_api_pb2.StateKey
+ continuation_token=None # type: Optional[bytes]
+ ):
# type: (...) -> Tuple[bytes, Optional[bytes]]
response = self._blocking_request(
beam_fn_api_pb2.StateRequest(
@@ -976,10 +987,11 @@ class GrpcStateHandler(StateHandler):
continuation_token=continuation_token)))
return response.get.data, response.get.continuation_token
- def append_raw(self,
- state_key, # type: Optional[beam_fn_api_pb2.StateKey]
- data # type: bytes
- ):
+ def append_raw(
+ self,
+ state_key, # type: Optional[beam_fn_api_pb2.StateKey]
+ data # type: bytes
+ ):
# type: (...) -> _Future
return self._request(
beam_fn_api_pb2.StateRequest(
@@ -1036,11 +1048,11 @@ class CachingStateHandler(object):
If activated but no cache token is supplied, caching is done at the bundle
level.
"""
-
- def __init__(self,
- global_state_cache, # type: StateCache
- underlying_state # type: StateHandler
- ):
+ def __init__(
+ self,
+ global_state_cache, # type: StateCache
+ underlying_state # type: StateHandler
+ ):
# type: (...) -> None
self._underlying = underlying_state
self._state_cache = global_state_cache
@@ -1078,10 +1090,11 @@ class CachingStateHandler(object):
self._context.user_state_cache_token = None
self._context.bundle_cache_token = None
- def blocking_get(self,
- state_key, # type: beam_fn_api_pb2.StateKey
- coder, # type: coder_impl.CoderImpl
- ):
+ def blocking_get(
+ self,
+ state_key, # type: beam_fn_api_pb2.StateKey
+ coder, # type: coder_impl.CoderImpl
+ ):
# type: (...) -> Iterable[Any]
cache_token = self._get_cache_token(state_key)
if not cache_token:
@@ -1107,11 +1120,12 @@ class CachingStateHandler(object):
state_key)
return cached_value
- def extend(self,
- state_key, # type: beam_fn_api_pb2.StateKey
- coder, # type: coder_impl.CoderImpl
- elements, # type: Iterable[Any]
- ):
+ def extend(
+ self,
+ state_key, # type: beam_fn_api_pb2.StateKey
+ coder, # type: coder_impl.CoderImpl
+ elements, # type: Iterable[Any]
+ ):
# type: (...) -> _Future
cache_token = self._get_cache_token(state_key)
if cache_token:
@@ -1161,7 +1175,7 @@ class CachingStateHandler(object):
state_key, # type: beam_fn_api_pb2.StateKey
coder, # type: coder_impl.CoderImpl
continuation_token=None # type: Optional[bytes]
- ):
+ ):
# type: (...) -> Iterator[Any]
"""Materializes the state lazily, one element at a time.
@@ -1196,7 +1210,7 @@ class CachingStateHandler(object):
self,
state_key, # type: beam_fn_api_pb2.StateKey
coder # type: coder_impl.CoderImpl
- ):
+ ):
# type: (...) -> Iterable[Any]
"""Materialized the first page of data, concatenated with a lazy iterable