You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "pabloem (via GitHub)" <gi...@apache.org> on 2023/02/02 18:03:16 UTC

[GitHub] [beam] pabloem commented on a diff in pull request #25093: Task #25064: Python SDK Data sampling implementation

pabloem commented on code in PR #25093:
URL: https://github.com/apache/beam/pull/25093#discussion_r1093829146


##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -144,6 +145,11 @@ def create_harness(environment, dry_run=False):
 
   if dry_run:
     return
+
+  data_sampler = None
+  if 'enable_data_sampling' in experiments:

Review Comment:
   is sampling enabled for a full run of the pipeline? or momentarily?



##########
sdks/python/apache_beam/runners/worker/data_sampler.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Functionaliry for sampling elements during bundle execution."""
+
+# pytype: skip-file
+
+import collections
+import threading
+from typing import Any
+from typing import DefaultDict
+from typing import Deque
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+from apache_beam.coders.coder_impl import CoderImpl
+from apache_beam.coders.coder_impl import WindowedValueCoderImpl
+from apache_beam.coders.coders import Coder
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class OutputSampler:
+  """Represents a way to sample an output of a PTransform.
+
+  This is configurable to only keep max_samples (see constructor) sampled
+  elements in memory. The first 10 elements are always sampled, then each
+  subsequent sample_every_n (see constructor).
+  """
+  def __init__(
+      self,
+      coder: Coder,
+      max_samples: int = 10,
+      sample_every_n: int = 1000) -> None:
+    self._samples: Deque[Any] = collections.deque(maxlen=max_samples)
+    self._coder_impl: CoderImpl = coder.get_impl()
+    self._sample_count: int = 0
+    self._sample_every_n: int = sample_every_n

Review Comment:
   I'm wondering whether we might want a different sampling algorithm for e.g.streaming bundles that may never reach 1k elements



##########
sdks/python/apache_beam/runners/worker/data_sampler.py:
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Functionaliry for sampling elements during bundle execution."""
+
+# pytype: skip-file
+
+import collections
+import threading
+from typing import Any
+from typing import DefaultDict
+from typing import Deque
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+from apache_beam.coders.coder_impl import CoderImpl
+from apache_beam.coders.coder_impl import WindowedValueCoderImpl
+from apache_beam.coders.coders import Coder
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class OutputSampler:
+  """Represents a way to sample an output of a PTransform.
+
+  This is configurable to only keep max_samples (see constructor) sampled
+  elements in memory. The first 10 elements are always sampled, then each
+  subsequent sample_every_n (see constructor).
+  """
+  def __init__(
+      self,
+      coder: Coder,
+      max_samples: int = 10,
+      sample_every_n: int = 1000) -> None:
+    self._samples: Deque[Any] = collections.deque(maxlen=max_samples)
+    self._coder_impl: CoderImpl = coder.get_impl()
+    self._sample_count: int = 0
+    self._sample_every_n: int = sample_every_n
+
+  def remove_windowed_value(self, el: Union[WindowedValue, Any]) -> Any:
+    """Retrieves the value from the WindowedValue.
+
+    The Python SDK passes elements as WindowedValues, which may not match the
+    coder for that particular PCollection.
+    """
+    if isinstance(el, WindowedValue):
+      return self.remove_windowed_value(el.value)
+    return el
+
+  def flush(self) -> List[bytes]:
+    """Returns all samples and clears buffer."""
+    if isinstance(self._coder_impl, WindowedValueCoderImpl):
+      samples = [s for s in self._samples]
+    else:
+      samples = [self.remove_windowed_value(s) for s in self._samples]
+
+    self._samples.clear()
+    return [self._coder_impl.encode(s) for s in samples]
+
+  def sample(self, element: Any) -> None:
+    """Samples the given element to an internal buffer.
+
+    Samples are only taken for the first 10 elements then every
+    `self._sample_every_n` after.
+    """
+    self._sample_count += 1
+
+    if (self._sample_count <= 10 or
+        self._sample_count % self._sample_every_n == 0):
+      self._samples.append(element)
+
+
+class DataSampler:
+  """A class for querying any samples generated during execution.
+
+  This class is meant to be a singleton with regard to a particular
+  `sdk_worker.SdkHarness`. When creating the operators, individual
+  `OutputSampler`s are created from `DataSampler.sample_output`. This allows for
+  multi-threaded sampling of a PCollection across the SdkHarness.
+
+  Samples generated during execution can then be sampled with the `samples`
+  method. This can filter to samples from a descriptor id and pcollection id.
+  """
+  def __init__(self, max_samples: int = 10, sample_every_n: int = 1000) -> None:
+    # Key is a tuple of (ProcessBundleDescriptor id, PCollection id). Is guarded
+    # by the _samplers_lock.
+    self._samplers: Dict[Tuple[str, str], OutputSampler] = {}
+    # Bundles are processed in parallel, so new samplers may be added when the
+    # runner queries for samples.
+    self._samplers_lock: threading.Lock = threading.Lock()
+    self._max_samples = max_samples
+    self._sampler_every_n = sample_every_n
+
+  def sample_output(
+      self, descriptor_id: str, pcoll_id: str, coder: Coder) -> OutputSampler:
+    """Create or get an OutputSampler for a (descriptor_id, pcoll_id) pair."""
+    key = (descriptor_id, pcoll_id)
+    with self._samplers_lock:
+      if key in self._samplers:
+        sampler = self._samplers[key]
+      else:
+        sampler = OutputSampler(coder, self._max_samples, self._sampler_every_n)
+        self._samplers[key] = sampler
+      return sampler
+
+  def samples(
+      self,
+      descriptor_ids: Optional[Iterable[str]] = None,
+      pcollection_ids: Optional[Iterable[str]] = None
+  ) -> Dict[str, List[bytes]]:
+    """Returns all samples filtered by descriptor ids and pcollection ids."""
+    ret: DefaultDict[str, List[bytes]] = collections.defaultdict(lambda: [])
+
+    with self._samplers_lock:
+      samplers = self._samplers.copy()
+
+    for sampler_id in samplers:
+      descriptor_id, pcoll_id = sampler_id
+      if descriptor_ids and descriptor_id not in descriptor_ids:
+        continue
+
+      if pcollection_ids and pcoll_id not in pcollection_ids:
+        continue
+
+      samples = samplers[sampler_id].flush()
+      if samples:
+        ret[pcoll_id].extend(samples)

Review Comment:
   what is a descriptor ID? I just wonder whether you could iterate `zip(descriptor_ids, pcollection_ids)` in pairs so that you don't have to iterate the whole dictionary of samples (and maybe even avoid copying the whole thing always? idk)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org