You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/08/12 19:03:32 UTC
[beam] branch master updated: Add max count to utils.to_element_list
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 19c8d6f Add max count to utils.to_element_list
new 04f47cd Merge pull request #12413 from [BEAM-10603] Add max count to utils.to_element_list
19c8d6f is described below
commit 19c8d6fed91ab35297355b7aad39c95c19b21f83
Author: Sam Rohde <sr...@google.com>
AuthorDate: Mon Jul 27 15:26:30 2020 -0700
Add max count to utils.to_element_list
Change-Id: I9a2fbf1532b3d22a612e7a09f4f1fb2b9635c40b
---
.../apache_beam/runners/interactive/utils.py | 31 +++++++++++++-----
.../apache_beam/runners/interactive/utils_test.py | 38 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index b8784fa..43ff7b5 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -27,30 +27,45 @@ import logging
import pandas as pd
from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
+from apache_beam.testing.test_stream import WindowedValueHolder
def to_element_list(
reader, # type: Generator[Union[TestStreamPayload.Event, WindowedValueHolder]]
coder, # type: Coder
- include_window_info # type: bool
+ include_window_info, # type: bool
+ n=None # type: int
):
# type: (...) -> List[WindowedValue]
"""Returns an iterator that properly decodes the elements from the reader.
"""
- for e in reader:
- if isinstance(e, TestStreamPayload.Event):
- if (e.HasField('watermark_event') or e.HasField('processing_time_event')):
- continue
- else:
+ # Defining a generator like this makes it easier to limit the count of
+ # elements read. Otherwise, the count limit would need to be duplicated.
+ def elements():
+ for e in reader:
+ if isinstance(e, TestStreamPayload.Event):
+ if (e.HasField('watermark_event') or
+ e.HasField('processing_time_event')):
+ continue
for tv in e.element_event.elements:
decoded = coder.decode(tv.encoded_element)
yield (
decoded.windowed_value
if include_window_info else decoded.windowed_value.value)
- else:
- yield e.windowed_value if include_window_info else e.windowed_value.value
+ elif isinstance(e, WindowedValueHolder):
+ yield (
+ e.windowed_value if include_window_info else e.windowed_value.value)
+ else:
+ yield e
+
+ # Because we can yield multiple elements from a single TestStreamFileRecord,
+ # we have to limit the count here to ensure that `n` is fulfilled.
+ for count, e in enumerate(elements()):
+ if n and count >= n:
+ break
+ yield e
def elements_to_df(elements, include_window_info=False):
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
index 616d250..3e4a546 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -25,8 +25,12 @@ import unittest
import numpy as np
import pandas as pd
+from apache_beam import coders
+from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import utils
+from apache_beam.testing.test_stream import WindowedValueHolder
+from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.windowed_value import WindowedValue
# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
@@ -94,6 +98,40 @@ class ParseToDataframeTest(unittest.TestCase):
pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
+class ToElementListTest(unittest.TestCase):
+ def test_test_stream_payload_events(self):
+ """Tests that the to_element_list can limit the count in a single bundle."""
+
+ coder = coders.FastPrimitivesCoder()
+
+ def reader():
+ element_payload = [
+ TestStreamPayload.TimestampedElement(
+ encoded_element=coder.encode(
+ WindowedValueHolder(WindowedValue(e, 0, []))),
+ timestamp=Timestamp.of(0).micros) for e in range(10)
+ ]
+
+ event = TestStreamPayload.Event(
+ element_event=TestStreamPayload.Event.AddElements(
+ elements=element_payload))
+ yield event
+
+ # The reader creates 10 elements in a single TestStreamPayload but we limit
+ # the number of elements read to 5 here. This tests that the to_element_list
+ # can limit the number of elements in a single bundle.
+ elements = utils.to_element_list(
+ reader(), coder, include_window_info=False, n=5)
+ self.assertSequenceEqual(list(elements), list(range(5)))
+
+ def test_element_limit_count(self):
+ """Tests that the to_element_list can limit the count."""
+
+ elements = utils.to_element_list(
+ iter(range(10)), None, include_window_info=False, n=5)
+ self.assertSequenceEqual(list(elements), list(range(5)))
+
+
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')