You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/06 21:24:37 UTC

[GitHub] [beam] robertwb commented on a diff in pull request #22130: Allow one to bound the size of output shards when writing to files.

robertwb commented on code in PR #22130:
URL: https://github.com/apache/beam/pull/22130#discussion_r915257613


##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -68,6 +68,9 @@ def __init__(
       shard_name_template=None,
       mime_type='application/octet-stream',
       compression_type=CompressionTypes.AUTO,
+      *,

Review Comment:
   I don't think we have any guidance here (other than that which is generic to Python, which would indicate most of these arguments should be passed by keyword). 



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -108,6 +111,8 @@ def __init__(
         shard_name_template)
     self.compression_type = compression_type
     self.mime_type = mime_type
+    self.max_records_per_shard = max_records_per_shard
+    self.max_bytes_per_shard = max_bytes_per_shard

Review Comment:
   Nice catch. Fixed so that both take effect. 



##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -848,8 +848,12 @@ class Writer(object):
   See ``iobase.Sink`` for more detailed documentation about the process of
   writing to a sink.
   """
-  def write(self, value):
-    """Writes a value to the sink using the current writer."""
+  def write(self, value) -> Optional[bool]:

Review Comment:
   It's backwards compatible, which is why I made it Optional. But I've moved to using at_capacity as suggested instead. 



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
     self.sink = sink
     self.temp_shard_path = temp_shard_path
     self.temp_handle = self.sink.open(temp_shard_path)
+    self.num_records_written = 0
 
   def write(self, value):
     self.sink.write_record(self.temp_handle, value)
+    if self.sink.max_records_per_shard:

Review Comment:
   Done.



##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -1184,7 +1188,9 @@ def process(self, element, init_result):
     if self.writer is None:
       # We ignore UUID collisions here since they are extremely rare.
       self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
-    self.writer.write(element)
+    if self.writer.write(element):

Review Comment:
   Done.



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
     self.sink = sink
     self.temp_shard_path = temp_shard_path
     self.temp_handle = self.sink.open(temp_shard_path)
+    self.num_records_written = 0
 
   def write(self, value):
     self.sink.write_record(self.temp_handle, value)
+    if self.sink.max_records_per_shard:
+      self.num_records_written += 1
+      return self.num_records_written >= self.sink.max_records_per_shard
+    if self.sink.max_bytes_per_shard:
+      return (
+          self.sink.byte_counter.bytes_written >= self.sink.max_bytes_per_shard)
 
   def close(self):
     self.sink.close(self.temp_handle)
     return self.temp_shard_path
+
+
+class _ByteCountingWriter:

Review Comment:
   Unfortunately io.BufferedWriter.write returns the number of bytes written _for that call_, not a running total. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org