You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/30 07:30:04 UTC
[GitHub] [beam] rohdesamuel opened a new pull request #12413: Add max count to utils.to_element_list
rohdesamuel opened a new pull request #12413:
URL: https://github.com/apache/beam/pull/12413
Change-Id: I9a2fbf1532b3d22a612e7a09f4f1fb2b9635c40b
PR 4/7
Reading elements from cache can yield TestStreamFileRecords which can have many elements that exceeds the requested element count.
This also adds functionality to decode more types of elements (WindowedValueHolder and "Any").
------------------------
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
--- | --- | --- | --- | --- | --- | ---
Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] rohdesamuel commented on a change in pull request #12413: [BEAM-10603] Add max count to utils.to_element_list
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12413:
URL: https://github.com/apache/beam/pull/12413#discussion_r467247182
##########
File path: sdks/python/apache_beam/runners/interactive/utils.py
##########
@@ -27,30 +27,46 @@
import pandas as pd
from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
+from apache_beam.testing.test_stream import WindowedValueHolder
def to_element_list(
reader, # type: Generator[Union[TestStreamPayload.Event, WindowedValueHolder]]
coder, # type: Coder
- include_window_info # type: bool
+ include_window_info, # type: bool
+ n=None # type: int
):
# type: (...) -> List[WindowedValue]
"""Returns an iterator that properly decodes the elements from the reader.
"""
- for e in reader:
- if isinstance(e, TestStreamPayload.Event):
- if (e.HasField('watermark_event') or e.HasField('processing_time_event')):
- continue
+ # Defining a generator like this makes it easier to limit the count of
+ # elements read. Otherwise, the count limit would need to be duplicated.
+ def elements():
+ for e in reader:
+ if isinstance(e, TestStreamPayload.Event):
+ if (e.HasField('watermark_event') or
+ e.HasField('processing_time_event')):
+ continue
+ else:
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] pabloem merged pull request #12413: [BEAM-10603] Add max count to utils.to_element_list
Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #12413:
URL: https://github.com/apache/beam/pull/12413
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] rohdesamuel commented on pull request #12413: [BEAM-10603] Add max count to utils.to_element_list
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12413:
URL: https://github.com/apache/beam/pull/12413#issuecomment-670772296
Run PythonDocker PreCommit
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] rohdesamuel commented on pull request #12413: [BEAM-10603] Add max count to utils.to_element_list
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12413:
URL: https://github.com/apache/beam/pull/12413#issuecomment-670772725
Run PythonDocker PreCommit
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] rohdesamuel commented on pull request #12413: [BEAM-10603] Add max count to utils.to_element_list
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12413:
URL: https://github.com/apache/beam/pull/12413#issuecomment-670772593
Run PythonDocker PreCommit
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] rohdesamuel commented on pull request #12413: [BEAM-10603] Add max count to utils.to_element_list
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12413:
URL: https://github.com/apache/beam/pull/12413#issuecomment-670772413
Run PythonDocker PreCommit
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] rohdesamuel commented on pull request #12413: [BEAM-10603] Add max count to utils.to_element_list
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12413:
URL: https://github.com/apache/beam/pull/12413#issuecomment-670771070
Run PythonDocker PreCommit
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] davidyan74 commented on a change in pull request #12413: [BEAM-10603] Add max count to utils.to_element_list
Posted by GitBox <gi...@apache.org>.
davidyan74 commented on a change in pull request #12413:
URL: https://github.com/apache/beam/pull/12413#discussion_r466657201
##########
File path: sdks/python/apache_beam/runners/interactive/utils.py
##########
@@ -27,30 +27,46 @@
import pandas as pd
from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
+from apache_beam.testing.test_stream import WindowedValueHolder
def to_element_list(
reader, # type: Generator[Union[TestStreamPayload.Event, WindowedValueHolder]]
coder, # type: Coder
- include_window_info # type: bool
+ include_window_info, # type: bool
+ n=None # type: int
):
# type: (...) -> List[WindowedValue]
"""Returns an iterator that properly decodes the elements from the reader.
"""
- for e in reader:
- if isinstance(e, TestStreamPayload.Event):
- if (e.HasField('watermark_event') or e.HasField('processing_time_event')):
- continue
+ # Defining a generator like this makes it easier to limit the count of
+ # elements read. Otherwise, the count limit would need to be duplicated.
+ def elements():
+ for e in reader:
+ if isinstance(e, TestStreamPayload.Event):
+ if (e.HasField('watermark_event') or
+ e.HasField('processing_time_event')):
+ continue
+ else:
Review comment:
nit: this else is not necessary since the flow ends with continue.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] rohdesamuel commented on pull request #12413: [BEAM-10603] Add max count to utils.to_element_list
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12413:
URL: https://github.com/apache/beam/pull/12413#issuecomment-670158103
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] rohdesamuel commented on pull request #12413: [BEAM-10603] Add max count to utils.to_element_list
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12413:
URL: https://github.com/apache/beam/pull/12413#issuecomment-670773173
Run PythonDocker PreCommit
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org