You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/06 14:50:17 UTC

[GitHub] [beam] Abacn commented on a diff in pull request #22130: Allow one to bound the size of output shards when writing to files.

Abacn commented on code in PR #22130:
URL: https://github.com/apache/beam/pull/22130#discussion_r914880819


##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -68,6 +68,9 @@ def __init__(
       shard_name_template=None,
       mime_type='application/octet-stream',
       compression_type=CompressionTypes.AUTO,
+      *,

Review Comment:
   Do we have code style guide about the usage of asterisk in function parameters?



##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -848,8 +848,12 @@ class Writer(object):
   See ``iobase.Sink`` for more detailed documentation about the process of
   writing to a sink.
   """
-  def write(self, value):
-    """Writes a value to the sink using the current writer."""
+  def write(self, value) -> Optional[bool]:

Review Comment:
   Better not change the signature of base class. 



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
     self.sink = sink
     self.temp_shard_path = temp_shard_path
     self.temp_handle = self.sink.open(temp_shard_path)
+    self.num_records_written = 0
 
   def write(self, value):
     self.sink.write_record(self.temp_handle, value)
+    if self.sink.max_records_per_shard:

Review Comment:
   If write still does not return, could create another method like "at_capacity" returns true if the writer has reached capacity. Also in this way `max_bytes_per_shard` and `max_records_per_shard` can have effect at the same time.



##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -1184,7 +1188,9 @@ def process(self, element, init_result):
     if self.writer is None:
       # We ignore UUID collisions here since they are extremely rare.
       self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
-    self.writer.write(element)
+    if self.writer.write(element):

Review Comment:
   always call self.writer.write and then test if self.writer.at_capacity.



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -406,10 +417,33 @@ def __init__(self, sink, temp_shard_path):
     self.sink = sink
     self.temp_shard_path = temp_shard_path
     self.temp_handle = self.sink.open(temp_shard_path)
+    self.num_records_written = 0
 
   def write(self, value):
     self.sink.write_record(self.temp_handle, value)
+    if self.sink.max_records_per_shard:
+      self.num_records_written += 1
+      return self.num_records_written >= self.sink.max_records_per_shard
+    if self.sink.max_bytes_per_shard:
+      return (
+          self.sink.byte_counter.bytes_written >= self.sink.max_bytes_per_shard)
 
   def close(self):
     self.sink.close(self.temp_handle)
     return self.temp_shard_path
+
+
+class _ByteCountingWriter:

Review Comment:
   Can bytes_written be handled also in write function as num_records_written thus no need for the wrapped class? FileBasedSink.open used to return an instance of BufferedWriter always but if use this wrapped class it now it may not.



##########
sdks/python/apache_beam/io/filebasedsink.py:
##########
@@ -108,6 +111,8 @@ def __init__(
         shard_name_template)
     self.compression_type = compression_type
     self.mime_type = mime_type
+    self.max_records_per_shard = max_records_per_shard
+    self.max_bytes_per_shard = max_bytes_per_shard

Review Comment:
   From the implementation of write below, only one of them will take effect. Do we need to raise a warning (or info) to remind possible misuse when neither is None? Also need to document this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org