You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/01 20:57:04 UTC

[GitHub] [beam] robertwb opened a new pull request, #22130: Allow one to bound the size of output shards when writing to files.

robertwb opened a new pull request, #22130:
URL: https://github.com/apache/beam/pull/22130

   This fixes #22129 by possibly writing multiple shards per bundle.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb merged pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
robertwb merged PR #22130:
URL: https://github.com/apache/beam/pull/22130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #22130:
URL: https://github.com/apache/beam/pull/22130#discussion_r915257613


##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -68,6 +68,9 @@ def __init__(
       shard_name_template=None,
       mime_type='application/octet-stream',
       compression_type=CompressionTypes.AUTO,
+      *,

Review Comment:
   I don't think we have any guidance here (other than that which is generic to Python, which would indicate most of these arguments should be passed by keyword). 



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -108,6 +111,8 @@ def __init__(
         shard_name_template)
     self.compression_type = compression_type
     self.mime_type = mime_type
+    self.max_records_per_shard = max_records_per_shard
+    self.max_bytes_per_shard = max_bytes_per_shard

Review Comment:
   Nice catch. Fixed so that both take effect. 



##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -848,8 +848,12 @@ class Writer(object):
   See ``iobase.Sink`` for more detailed documentation about the process of
   writing to a sink.
   """
-  def write(self, value):
-    """Writes a value to the sink using the current writer."""
+  def write(self, value) -> Optional[bool]:

Review Comment:
   It's backwards compatible, which is why I made it Optional. But I've moved to using at_capacity as suggested instead. 



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
     self.sink = sink
     self.temp_shard_path = temp_shard_path
     self.temp_handle = self.sink.open(temp_shard_path)
+    self.num_records_written = 0
 
   def write(self, value):
     self.sink.write_record(self.temp_handle, value)
+    if self.sink.max_records_per_shard:

Review Comment:
   Done.



##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -1184,7 +1188,9 @@ def process(self, element, init_result):
     if self.writer is None:
       # We ignore UUID collisions here since they are extremely rare.
       self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
-    self.writer.write(element)
+    if self.writer.write(element):

Review Comment:
   Done.



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
     self.sink = sink
     self.temp_shard_path = temp_shard_path
     self.temp_handle = self.sink.open(temp_shard_path)
+    self.num_records_written = 0
 
   def write(self, value):
     self.sink.write_record(self.temp_handle, value)
+    if self.sink.max_records_per_shard:
+      self.num_records_written += 1
+      return self.num_records_written >= self.sink.max_records_per_shard
+    if self.sink.max_bytes_per_shard:
+      return (
+          self.sink.byte_counter.bytes_written >= self.sink.max_bytes_per_shard)
 
   def close(self):
     self.sink.close(self.temp_handle)
     return self.temp_shard_path
+
+
+class _ByteCountingWriter:

Review Comment:
   Unfortunately io.BufferedWriter.write returns the number of bytes written _for that call_, not a running total. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #22130:
URL: https://github.com/apache/beam/pull/22130#discussion_r914880819


##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -68,6 +68,9 @@ def __init__(
       shard_name_template=None,
       mime_type='application/octet-stream',
       compression_type=CompressionTypes.AUTO,
+      *,

Review Comment:
   Do we have code style guide about the usage of asterisk in function parameters?



##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -848,8 +848,12 @@ class Writer(object):
   See ``iobase.Sink`` for more detailed documentation about the process of
   writing to a sink.
   """
-  def write(self, value):
-    """Writes a value to the sink using the current writer."""
+  def write(self, value) -> Optional[bool]:

Review Comment:
   Better not change the signature of base class. 



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
     self.sink = sink
     self.temp_shard_path = temp_shard_path
     self.temp_handle = self.sink.open(temp_shard_path)
+    self.num_records_written = 0
 
   def write(self, value):
     self.sink.write_record(self.temp_handle, value)
+    if self.sink.max_records_per_shard:

Review Comment:
   If write still does not return, could create another method like "at_capacity" returns true if the writer has reached capacity. Also in this way `max_bytes_per_shard` and `max_records_per_shard` can have effect at the same time.



##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -1184,7 +1188,9 @@ def process(self, element, init_result):
     if self.writer is None:
       # We ignore UUID collisions here since they are extremely rare.
       self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
-    self.writer.write(element)
+    if self.writer.write(element):

Review Comment:
   always call self.writer.write and then test if self.writer.at_capacity.



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
     self.sink = sink
     self.temp_shard_path = temp_shard_path
     self.temp_handle = self.sink.open(temp_shard_path)
+    self.num_records_written = 0
 
   def write(self, value):
     self.sink.write_record(self.temp_handle, value)
+    if self.sink.max_records_per_shard:
+      self.num_records_written += 1
+      return self.num_records_written >= self.sink.max_records_per_shard
+    if self.sink.max_bytes_per_shard:
+      return (
+          self.sink.byte_counter.bytes_written >= self.sink.max_bytes_per_shard)
 
   def close(self):
     self.sink.close(self.temp_handle)
     return self.temp_shard_path
+
+
+class _ByteCountingWriter:

Review Comment:
   Can bytes_written be handled also in write function as num_records_written thus no need for the wrapped class? FileBasedSink.open used to return an instance of BufferedWriter always but if use this wrapped class it now it may not.



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -108,6 +111,8 @@ def __init__(
         shard_name_template)
     self.compression_type = compression_type
     self.mime_type = mime_type
+    self.max_records_per_shard = max_records_per_shard
+    self.max_bytes_per_shard = max_bytes_per_shard

Review Comment:
   From the implementation of write below, only one of them will take effect. Do we need to raise a warning (or info) to remind possible misuse when neither is None? Also need to document this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
robertwb commented on PR #22130:
URL: https://github.com/apache/beam/pull/22130#issuecomment-1172779296

   R: @abacn


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
robertwb commented on PR #22130:
URL: https://github.com/apache/beam/pull/22130#issuecomment-1179252832

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
robertwb commented on PR #22130:
URL: https://github.com/apache/beam/pull/22130#issuecomment-1179423377

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
robertwb commented on PR #22130:
URL: https://github.com/apache/beam/pull/22130#issuecomment-1179252745

   Unrelated failure in apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers.test_pardo_large_input
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
robertwb commented on PR #22130:
URL: https://github.com/apache/beam/pull/22130#issuecomment-1289288250

   This has not been plumbed through for `WriteToParquet`, but the underlying infrastructure should work there (see the changes to sdks/python/apache_beam/io/textio.py -- just adding arguments and passing them through). I'd be happy to help you with a pull request. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #22130:
URL: https://github.com/apache/beam/pull/22130#discussion_r915149270


##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
     self.sink = sink
     self.temp_shard_path = temp_shard_path
     self.temp_handle = self.sink.open(temp_shard_path)
+    self.num_records_written = 0
 
   def write(self, value):
     self.sink.write_record(self.temp_handle, value)
+    if self.sink.max_records_per_shard:
+      self.num_records_written += 1
+      return self.num_records_written >= self.sink.max_records_per_shard
+    if self.sink.max_bytes_per_shard:
+      return (
+          self.sink.byte_counter.bytes_written >= self.sink.max_bytes_per_shard)
 
   def close(self):
     self.sink.close(self.temp_handle)
     return self.temp_shard_path
+
+
+class _ByteCountingWriter:

Review Comment:
   Found that io.BufferedWriter.write returns the number of bytes written (https://docs.python.org/3.7/library/io.html#io.BufferedWriter.write) so bytes_written can be traced directly in FileBasedSinkWriter.write



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22130:
URL: https://github.com/apache/beam/pull/22130#issuecomment-1172764444

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @tvalentyn for label python.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22130:
URL: https://github.com/apache/beam/pull/22130#issuecomment-1172719507

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22130?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22130](https://codecov.io/gh/apache/beam/pull/22130?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (fc81f14) into [master](https://codecov.io/gh/apache/beam/commit/52e1b3ff9e9b1c5067b59bbb8053f0bc53f9679f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (52e1b3f) will **increase** coverage by `0.02%`.
   > The diff coverage is `96.42%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22130      +/-   ##
   ==========================================
   + Coverage   73.99%   74.01%   +0.02%     
   ==========================================
     Files         703      703              
     Lines       92936    93015      +79     
   ==========================================
   + Hits        68769    68847      +78     
   - Misses      22901    22902       +1     
     Partials     1266     1266              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.59% <96.42%> (+0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22130?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/textio.py](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vdGV4dGlvLnB5) | `97.05% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/filebasedsink.py](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZWJhc2Vkc2luay5weQ==) | `95.94% <95.83%> (-0.04%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/iobase.py](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vaW9iYXNlLnB5) | `86.31% <100.00%> (+0.06%)` | :arrow_up: |
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.06% <0.00%> (-1.33%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.17% <0.00%> (-0.50%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `92.57% <0.00%> (ø)` | |
   | [.../python/apache\_beam/typehints/trivial\_inference.py](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3RyaXZpYWxfaW5mZXJlbmNlLnB5) | `96.41% <0.00%> (ø)` | |
   | [...thon/apache\_beam/ml/inference/pytorch\_inference.py](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL3B5dG9yY2hfaW5mZXJlbmNlLnB5) | `0.00% <0.00%> (ø)` | |
   | [...am/examples/inference/pytorch\_language\_modeling.py](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvaW5mZXJlbmNlL3B5dG9yY2hfbGFuZ3VhZ2VfbW9kZWxpbmcucHk=) | `0.00% <0.00%> (ø)` | |
   | ... and [9 more](https://codecov.io/gh/apache/beam/pull/22130/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/22130?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/22130?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [52e1b3f...fc81f14](https://codecov.io/gh/apache/beam/pull/22130?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #22130:
URL: https://github.com/apache/beam/pull/22130#discussion_r914941476


##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
     self.sink = sink
     self.temp_shard_path = temp_shard_path
     self.temp_handle = self.sink.open(temp_shard_path)
+    self.num_records_written = 0
 
   def write(self, value):
     self.sink.write_record(self.temp_handle, value)
+    if self.sink.max_records_per_shard:
+      self.num_records_written += 1
+      return self.num_records_written >= self.sink.max_records_per_shard
+    if self.sink.max_bytes_per_shard:
+      return (
+          self.sink.byte_counter.bytes_written >= self.sink.max_bytes_per_shard)
 
   def close(self):
     self.sink.close(self.temp_handle)
     return self.temp_shard_path
+
+
+class _ByteCountingWriter:

Review Comment:
   I see, if the writer is compressed, record sends to FileBasedWriter may have different length to the record actually written and that's why a wrapped class is needed here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22130:
URL: https://github.com/apache/beam/pull/22130#issuecomment-1172779766

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lhoestq commented on pull request #22130: Allow one to bound the size of output shards when writing to files.

Posted by GitBox <gi...@apache.org>.
lhoestq commented on PR #22130:
URL: https://github.com/apache/beam/pull/22130#issuecomment-1288753604

   Hi ! is this available for `WriteToParquet` as well ? I couldn't find a way to do it with Parquet files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org