You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "rohdesamuel (via GitHub)" <gi...@apache.org> on 2023/02/03 21:52:20 UTC

[GitHub] [beam] rohdesamuel commented on a diff in pull request #25093: Task #25064: Python SDK Data sampling implementation

rohdesamuel commented on code in PR #25093:
URL: https://github.com/apache/beam/pull/25093#discussion_r1096090142


##########
sdks/python/apache_beam/runners/worker/data_sampler.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Functionaliry for sampling elements during bundle execution."""
+
+# pytype: skip-file
+
+import collections
+import threading
+from typing import Any
+from typing import DefaultDict
+from typing import Deque
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+from apache_beam.coders.coder_impl import CoderImpl
+from apache_beam.coders.coder_impl import WindowedValueCoderImpl
+from apache_beam.coders.coders import Coder
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class OutputSampler:
+  """Represents a way to sample an output of a PTransform.
+
+  This is configurable to only keep max_samples (see constructor) sampled
+  elements in memory. The first 10 elements are always sampled, then each
+  subsequent sample_every_n (see constructor).
+  """
+  def __init__(
+      self,
+      coder: Coder,
+      max_samples: int = 10,
+      sample_every_n: int = 1000) -> None:
+    self._samples: Deque[Any] = collections.deque(maxlen=max_samples)
+    self._coder_impl: CoderImpl = coder.get_impl()
+    self._sample_count: int = 0
+    self._sample_every_n: int = sample_every_n

Review Comment:
   That's a good point, and I was already mulling over changing this to a 30s timer instead of every 1000. If it's a timer, then we can also limit the the sampling frequency for performance reasons. I'll change it to a timer.



##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -144,6 +145,11 @@ def create_harness(environment, dry_run=False):
 
   if dry_run:
     return
+
+  data_sampler = None
+  if 'enable_data_sampling' in experiments:

Review Comment:
   It will be enabled for a full run



##########
sdks/python/apache_beam/runners/worker/data_sampler.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Functionaliry for sampling elements during bundle execution."""
+
+# pytype: skip-file
+
+import collections
+import threading
+from typing import Any
+from typing import DefaultDict
+from typing import Deque
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+from apache_beam.coders.coder_impl import CoderImpl
+from apache_beam.coders.coder_impl import WindowedValueCoderImpl
+from apache_beam.coders.coders import Coder
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class OutputSampler:
+  """Represents a way to sample an output of a PTransform.
+
+  This is configurable to only keep max_samples (see constructor) sampled
+  elements in memory. The first 10 elements are always sampled, then each
+  subsequent sample_every_n (see constructor).
+  """
+  def __init__(
+      self,
+      coder: Coder,
+      max_samples: int = 10,
+      sample_every_n: int = 1000) -> None:
+    self._samples: Deque[Any] = collections.deque(maxlen=max_samples)
+    self._coder_impl: CoderImpl = coder.get_impl()
+    self._sample_count: int = 0
+    self._sample_every_n: int = sample_every_n
+
+  def remove_windowed_value(self, el: Union[WindowedValue, Any]) -> Any:
+    """Retrieves the value from the WindowedValue.
+
+    The Python SDK passes elements as WindowedValues, which may not match the
+    coder for that particular PCollection.
+    """
+    if isinstance(el, WindowedValue):
+      return self.remove_windowed_value(el.value)
+    return el
+
+  def flush(self) -> List[bytes]:
+    """Returns all samples and clears buffer."""
+    if isinstance(self._coder_impl, WindowedValueCoderImpl):
+      samples = [s for s in self._samples]
+    else:
+      samples = [self.remove_windowed_value(s) for s in self._samples]
+
+    self._samples.clear()
+    return [self._coder_impl.encode(s) for s in samples]
+
+  def sample(self, element: Any) -> None:
+    """Samples the given element to an internal buffer.
+
+    Samples are only taken for the first 10 elements then every
+    `self._sample_every_n` after.
+    """
+    self._sample_count += 1
+
+    if (self._sample_count <= 10 or
+        self._sample_count % self._sample_every_n == 0):
+      self._samples.append(element)
+
+
+class DataSampler:
+  """A class for querying any samples generated during execution.
+
+  This class is meant to be a singleton with regard to a particular
+  `sdk_worker.SdkHarness`. When creating the operators, individual
+  `OutputSampler`s are created from `DataSampler.sample_output`. This allows for
+  multi-threaded sampling of a PCollection across the SdkHarness.
+
+  Samples generated during execution can then be sampled with the `samples`
+  method. This can filter to samples from a descriptor id and pcollection id.
+  """
+  def __init__(self, max_samples: int = 10, sample_every_n: int = 1000) -> None:
+    # Key is a tuple of (ProcessBundleDescriptor id, PCollection id). Is guarded
+    # by the _samplers_lock.
+    self._samplers: Dict[Tuple[str, str], OutputSampler] = {}
+    # Bundles are processed in parallel, so new samplers may be added when the
+    # runner queries for samples.
+    self._samplers_lock: threading.Lock = threading.Lock()
+    self._max_samples = max_samples
+    self._sampler_every_n = sample_every_n
+
+  def sample_output(
+      self, descriptor_id: str, pcoll_id: str, coder: Coder) -> OutputSampler:
+    """Create or get an OutputSampler for a (descriptor_id, pcoll_id) pair."""
+    key = (descriptor_id, pcoll_id)
+    with self._samplers_lock:
+      if key in self._samplers:
+        sampler = self._samplers[key]
+      else:
+        sampler = OutputSampler(coder, self._max_samples, self._sampler_every_n)
+        self._samplers[key] = sampler
+      return sampler
+
+  def samples(
+      self,
+      descriptor_ids: Optional[Iterable[str]] = None,
+      pcollection_ids: Optional[Iterable[str]] = None
+  ) -> Dict[str, List[bytes]]:
+    """Returns all samples filtered by descriptor ids and pcollection ids."""
+    ret: DefaultDict[str, List[bytes]] = collections.defaultdict(lambda: [])
+
+    with self._samplers_lock:
+      samplers = self._samplers.copy()
+
+    for sampler_id in samplers:
+      descriptor_id, pcoll_id = sampler_id
+      if descriptor_ids and descriptor_id not in descriptor_ids:
+        continue
+
+      if pcollection_ids and pcoll_id not in pcollection_ids:
+        continue
+
+      samples = samplers[sampler_id].flush()
+      if samples:
+        ret[pcoll_id].extend(samples)

Review Comment:
   I clarified the filter in the comments, but I think the given code is the simplest to write. Even though it's not the most performant, this isn't called very frequently. Wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org