You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/08/12 19:03:32 UTC

[beam] branch master updated: Add max count to utils.to_element_list

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 19c8d6f  Add max count to utils.to_element_list
     new 04f47cd  Merge pull request #12413 from [BEAM-10603] Add max count to utils.to_element_list
19c8d6f is described below

commit 19c8d6fed91ab35297355b7aad39c95c19b21f83
Author: Sam Rohde <sr...@google.com>
AuthorDate: Mon Jul 27 15:26:30 2020 -0700

    Add max count to utils.to_element_list
    
    Change-Id: I9a2fbf1532b3d22a612e7a09f4f1fb2b9635c40b
---
 .../apache_beam/runners/interactive/utils.py       | 31 +++++++++++++-----
 .../apache_beam/runners/interactive/utils_test.py  | 38 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index b8784fa..43ff7b5 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -27,30 +27,45 @@ import logging
 import pandas as pd
 
 from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
+from apache_beam.testing.test_stream import WindowedValueHolder
 
 
 def to_element_list(
     reader,  # type: Generator[Union[TestStreamPayload.Event, WindowedValueHolder]]
     coder,  # type: Coder
-    include_window_info  # type: bool
+    include_window_info,  # type: bool
+    n=None  # type: int
 ):
   # type: (...) -> List[WindowedValue]
 
   """Returns an iterator that properly decodes the elements from the reader.
   """
 
-  for e in reader:
-    if isinstance(e, TestStreamPayload.Event):
-      if (e.HasField('watermark_event') or e.HasField('processing_time_event')):
-        continue
-      else:
+  # Defining a generator like this makes it easier to limit the count of
+  # elements read. Otherwise, the count limit would need to be duplicated.
+  def elements():
+    for e in reader:
+      if isinstance(e, TestStreamPayload.Event):
+        if (e.HasField('watermark_event') or
+            e.HasField('processing_time_event')):
+          continue
         for tv in e.element_event.elements:
           decoded = coder.decode(tv.encoded_element)
           yield (
               decoded.windowed_value
               if include_window_info else decoded.windowed_value.value)
-    else:
-      yield e.windowed_value if include_window_info else e.windowed_value.value
+      elif isinstance(e, WindowedValueHolder):
+        yield (
+            e.windowed_value if include_window_info else e.windowed_value.value)
+      else:
+        yield e
+
+  # Because we can yield multiple elements from a single TestStreamFileRecord,
+  # we have to limit the count here to ensure that `n` is fulfilled.
+  for count, e in enumerate(elements()):
+    if n and count >= n:
+      break
+    yield e
 
 
 def elements_to_df(elements, include_window_info=False):
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
index 616d250..3e4a546 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -25,8 +25,12 @@ import unittest
 import numpy as np
 import pandas as pd
 
+from apache_beam import coders
+from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import utils
+from apache_beam.testing.test_stream import WindowedValueHolder
+from apache_beam.utils.timestamp import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
 
 # TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
@@ -94,6 +98,40 @@ class ParseToDataframeTest(unittest.TestCase):
     pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
 
 
+class ToElementListTest(unittest.TestCase):
+  def test_test_stream_payload_events(self):
+    """Tests that the to_element_list can limit the count in a single bundle."""
+
+    coder = coders.FastPrimitivesCoder()
+
+    def reader():
+      element_payload = [
+          TestStreamPayload.TimestampedElement(
+              encoded_element=coder.encode(
+                  WindowedValueHolder(WindowedValue(e, 0, []))),
+              timestamp=Timestamp.of(0).micros) for e in range(10)
+      ]
+
+      event = TestStreamPayload.Event(
+          element_event=TestStreamPayload.Event.AddElements(
+              elements=element_payload))
+      yield event
+
+    # The reader creates 10 elements in a single TestStreamPayload but we limit
+    # the number of elements read to 5 here. This tests that the to_element_list
+    # can limit the number of elements in a single bundle.
+    elements = utils.to_element_list(
+        reader(), coder, include_window_info=False, n=5)
+    self.assertSequenceEqual(list(elements), list(range(5)))
+
+  def test_element_limit_count(self):
+    """Tests that the to_element_list can limit the count."""
+
+    elements = utils.to_element_list(
+        iter(range(10)), None, include_window_info=False, n=5)
+    self.assertSequenceEqual(list(elements), list(range(5)))
+
+
 @unittest.skipIf(
     not ie.current_env().is_interactive_ready,
     '[interactive] dependency is not installed.')