You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/29 05:31:40 UTC

[GitHub] [beam] tvalentyn commented on a change in pull request #15775: [BEAM-12730] Python. Custom delimiter add corner case

tvalentyn commented on a change in pull request #15775:
URL: https://github.com/apache/beam/pull/15775#discussion_r738927345



##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -113,6 +113,9 @@ def __init__(self,
         `header_matcher` are both provided, the value of `skip_header_lines`
         lines will be skipped and the header will be processed from
         there.
+      delimiter (bytes) Optional: delimiter to split records.
+        Must not self-overlap, because self-overlapping delimiters cause
+        ambiguous parsing at the edge of bundles.

Review comment:
       ```suggestion
           ambiguous parsing.
   ```

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -257,17 +272,31 @@ def _find_separator_bounds(self, file_to_read, read_buffer):
 
       # Using find() here is more efficient than a linear scan
       # of the byte array.
-      next_lf = read_buffer.data.find(self._delimiter, current_pos)
+      next_lf = read_buffer.data.find(delimiter, current_pos)
 
       if next_lf >= 0:
-        if self._delimiter == b'\n' and read_buffer.data[next_lf -
-                                                         1:next_lf] == b'\r':
-          # Found a '\r\n'. Accepting that as the next separator.
+        if self._delimiter is None \

Review comment:
       prefer parenthesis for implicit continuation: https://docs.python.org/2.7/howto/doanddont.html#using-backslash-to-continue-statements

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -568,7 +613,9 @@ def __init__(
         skipped from each source file. Must be 0 or higher. Large number of
         skipped lines might impact performance.
       coder (~apache_beam.coders.coders.Coder): Coder used to decode each line.
-      delimiter (bytes): delimiter to split records
+      delimiter (bytes) Optional: delimiter to split records.
+        Must not self-overlap, because self-overlapping delimiters cause
+        ambiguous parsing at the edge of bundles.

Review comment:
       ```suggestion
           ambiguous parsing.
   ```

##########
File path: sdks/python/apache_beam/io/textio_test.py
##########
@@ -1023,13 +1031,75 @@ def test_read_after_splitting_skip_header(self):
     self.assertEqual(expected_data[2:], reference_lines)
     self.assertEqual(reference_lines, split_lines)
 
+  def test_custom_delimiter_read_from_text(self):
+    file_name, expected_data = write_data(
+      5, eol=EOL.CUSTOM_DELIMITER, custom_delimiter=b'@#')
+    assert len(expected_data) == 5
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'Read' >> ReadFromText(file_name, delimiter=b'@#')
+      assert_that(pcoll, equal_to(expected_data))
+
+  def test_custom_delimiter_read_all_single_file(self):
+    file_name, expected_data = write_data(
+      5, eol=EOL.CUSTOM_DELIMITER, custom_delimiter=b'@#')
+    assert len(expected_data) == 5
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'Create' >> Create(
+          [file_name]) | 'ReadAll' >> ReadAllFromText(delimiter=b'@#')
+      assert_that(pcoll, equal_to(expected_data))
+
+  def test_custom_delimiter_must_not_empty_bytes(self):

Review comment:
       ```suggestion
     def test_invalid_delimiters_are_rejected(self):
   ```

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -172,7 +172,12 @@ def split_points_unclaimed(stop_position):
         # belongs to the current bundle, hence ignoring that is incorrect.
         # Seeking to one byte before prevents that.
 
-        file_to_read.seek(start_offset - 1)
+        if self._delimiter is not None and start_offset >= len(self._delimiter):

Review comment:
       @dmitriikuzinepam please mark open comments as resolved after they are addressed or mention smth like 'done'.

##########
File path: sdks/python/apache_beam/io/textio_test.py
##########
@@ -1023,13 +1031,75 @@ def test_read_after_splitting_skip_header(self):
     self.assertEqual(expected_data[2:], reference_lines)
     self.assertEqual(reference_lines, split_lines)
 
+  def test_custom_delimiter_read_from_text(self):
+    file_name, expected_data = write_data(
+      5, eol=EOL.CUSTOM_DELIMITER, custom_delimiter=b'@#')
+    assert len(expected_data) == 5
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'Read' >> ReadFromText(file_name, delimiter=b'@#')
+      assert_that(pcoll, equal_to(expected_data))
+
+  def test_custom_delimiter_read_all_single_file(self):
+    file_name, expected_data = write_data(
+      5, eol=EOL.CUSTOM_DELIMITER, custom_delimiter=b'@#')
+    assert len(expected_data) == 5
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'Create' >> Create(
+          [file_name]) | 'ReadAll' >> ReadAllFromText(delimiter=b'@#')
+      assert_that(pcoll, equal_to(expected_data))
+
+  def test_custom_delimiter_must_not_empty_bytes(self):
+    file_name, _ = write_data(1)
+    for delimiter in (b'', '', '\r\n', 'a', 1):
+      with self.assertRaises(
+          ValueError, msg='Delimiter must be a non-empty bytes sequence.'):
+        _ = TextSource(
+            file_pattern=file_name,
+            min_bundle_size=0,
+            buffer_size=6,
+            compression_type=CompressionTypes.UNCOMPRESSED,
+            strip_trailing_newlines=True,
+            coder=coders.StrUtf8Coder(),
+            delimiter=delimiter,
+        )
+
+  def test_custom_delimiter_must_not_self_overlap_ok(self):

Review comment:
       (comment becomes unnecessary since the name is self-explanatory)

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -329,6 +358,15 @@ def _read_record(self, file_to_read, read_buffer):
           read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
           sep_bounds[1] - record_start_position_in_buffer)
 
+  @staticmethod
+  def _is_self_overlapping(delimiter):
+    # delimiter self-overlaps if v exists such as delimiter = vu = wv

Review comment:
       ```suggestion
       # A delimiter self-overlaps if it has a prefix that is also its suffix.
   ```

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -167,22 +175,27 @@ def split_points_unclaimed(stop_position):
           self._process_header(file_to_read, read_buffer))
       start_offset = max(start_offset, position_after_processing_header_lines)
       if start_offset > position_after_processing_header_lines:
-        # Seeking to one position before the start index and ignoring the
-        # current line. If start_position is at beginning if the line, that line
-        # belongs to the current bundle, hence ignoring that is incorrect.
-        # Seeking to one byte before prevents that.
+        # Seeking to one delimiter length before the start index and ignoring
+        # the current line. If start_position is at beginning if the line, that
+        # line belongs to the current bundle, hence ignoring that is incorrect.
+        # Seeking to one delimiter before prevents that.
 
-        file_to_read.seek(start_offset - 1)
+        if self._delimiter is not None and start_offset >= len(self._delimiter):
+          required_position = start_offset - len(self._delimiter)
+        else:
+          required_position = start_offset - 1
+
+        file_to_read.seek(required_position)
         read_buffer.reset()
         sep_bounds = self._find_separator_bounds(file_to_read, read_buffer)
         if not sep_bounds:
-          # Could not find a separator after (start_offset - 1). This means that
+          # Could not find a separator after required_position. This means that

Review comment:
       nit: consider using the term delimiter consistently throughout this file to avoid any questions whether we mean the same thing by delimiter and separator

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -496,6 +537,9 @@ def __init__(
       with_filename: If True, returns a Key Value with the key being the file
         name and the value being the actual data. If False, it only returns
         the data.
+      delimiter (bytes) Optional: delimiter to split records.
+        Must not self-overlap, because self-overlapping delimiters cause
+        ambiguous parsing at the edge of bundles.

Review comment:
       ```suggestion
           ambiguous parsing.
   ```

##########
File path: sdks/python/apache_beam/io/textio_test.py
##########
@@ -1023,13 +1031,75 @@ def test_read_after_splitting_skip_header(self):
     self.assertEqual(expected_data[2:], reference_lines)
     self.assertEqual(reference_lines, split_lines)
 
+  def test_custom_delimiter_read_from_text(self):
+    file_name, expected_data = write_data(
+      5, eol=EOL.CUSTOM_DELIMITER, custom_delimiter=b'@#')
+    assert len(expected_data) == 5
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'Read' >> ReadFromText(file_name, delimiter=b'@#')
+      assert_that(pcoll, equal_to(expected_data))
+
+  def test_custom_delimiter_read_all_single_file(self):
+    file_name, expected_data = write_data(
+      5, eol=EOL.CUSTOM_DELIMITER, custom_delimiter=b'@#')
+    assert len(expected_data) == 5
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'Create' >> Create(
+          [file_name]) | 'ReadAll' >> ReadAllFromText(delimiter=b'@#')
+      assert_that(pcoll, equal_to(expected_data))
+
+  def test_custom_delimiter_must_not_empty_bytes(self):
+    file_name, _ = write_data(1)
+    for delimiter in (b'', '', '\r\n', 'a', 1):
+      with self.assertRaises(
+          ValueError, msg='Delimiter must be a non-empty bytes sequence.'):
+        _ = TextSource(
+            file_pattern=file_name,
+            min_bundle_size=0,
+            buffer_size=6,
+            compression_type=CompressionTypes.UNCOMPRESSED,
+            strip_trailing_newlines=True,
+            coder=coders.StrUtf8Coder(),
+            delimiter=delimiter,
+        )
+
+  def test_custom_delimiter_must_not_self_overlap_ok(self):
+    """Non self-overlapping delimiter is accepted."""
+    file_name, _ = write_data(1)
+    for delimiter in (b'\n', b'\r\n', b'*', b'abc', b'cabdab', b'abcabd'):
+      _ = TextSource(
+          file_pattern=file_name,
+          min_bundle_size=0,
+          buffer_size=6,
+          compression_type=CompressionTypes.UNCOMPRESSED,
+          strip_trailing_newlines=True,
+          coder=coders.StrUtf8Coder(),
+          delimiter=delimiter,
+      )
+
+  def test_custom_delimiter_must_not_self_overlap_error(self):

Review comment:
       ```suggestion
     def test_self_overlapping_delimiter_is_rejected(self):
   ```

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -257,17 +272,31 @@ def _find_separator_bounds(self, file_to_read, read_buffer):
 
       # Using find() here is more efficient than a linear scan
       # of the byte array.
-      next_lf = read_buffer.data.find(self._delimiter, current_pos)
+      next_lf = read_buffer.data.find(delimiter, current_pos)
 
       if next_lf >= 0:
-        if self._delimiter == b'\n' and read_buffer.data[next_lf -
-                                                         1:next_lf] == b'\r':
-          # Found a '\r\n'. Accepting that as the next separator.
+        if self._delimiter is None \
+                and read_buffer.data[next_lf - 1:next_lf] == b'\r':
+          # Found a '\r\n' if delimiter is not define.

Review comment:
       ```suggestion
             # Accept both '\r\n' and '\n' as a default delimiter.
   ```
   

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -257,17 +272,31 @@ def _find_separator_bounds(self, file_to_read, read_buffer):
 
       # Using find() here is more efficient than a linear scan
       # of the byte array.
-      next_lf = read_buffer.data.find(self._delimiter, current_pos)
+      next_lf = read_buffer.data.find(delimiter, current_pos)

Review comment:
       1. Comment above no longer necessary now that we have custom delims.
   
   2. let's rename `next_lf`. `next_delim` perhaps?

##########
File path: sdks/python/apache_beam/io/textio_test.py
##########
@@ -1023,13 +1031,75 @@ def test_read_after_splitting_skip_header(self):
     self.assertEqual(expected_data[2:], reference_lines)
     self.assertEqual(reference_lines, split_lines)
 
+  def test_custom_delimiter_read_from_text(self):
+    file_name, expected_data = write_data(
+      5, eol=EOL.CUSTOM_DELIMITER, custom_delimiter=b'@#')
+    assert len(expected_data) == 5
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'Read' >> ReadFromText(file_name, delimiter=b'@#')
+      assert_that(pcoll, equal_to(expected_data))
+
+  def test_custom_delimiter_read_all_single_file(self):
+    file_name, expected_data = write_data(
+      5, eol=EOL.CUSTOM_DELIMITER, custom_delimiter=b'@#')
+    assert len(expected_data) == 5
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'Create' >> Create(
+          [file_name]) | 'ReadAll' >> ReadAllFromText(delimiter=b'@#')
+      assert_that(pcoll, equal_to(expected_data))
+
+  def test_custom_delimiter_must_not_empty_bytes(self):
+    file_name, _ = write_data(1)
+    for delimiter in (b'', '', '\r\n', 'a', 1):
+      with self.assertRaises(
+          ValueError, msg='Delimiter must be a non-empty bytes sequence.'):
+        _ = TextSource(
+            file_pattern=file_name,
+            min_bundle_size=0,
+            buffer_size=6,
+            compression_type=CompressionTypes.UNCOMPRESSED,
+            strip_trailing_newlines=True,
+            coder=coders.StrUtf8Coder(),
+            delimiter=delimiter,
+        )
+
+  def test_custom_delimiter_must_not_self_overlap_ok(self):

Review comment:
       ```suggestion
     def test_non_self_overlapping_delimiter_is_accepted(self):
   ```

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -257,17 +272,31 @@ def _find_separator_bounds(self, file_to_read, read_buffer):
 
       # Using find() here is more efficient than a linear scan
       # of the byte array.
-      next_lf = read_buffer.data.find(self._delimiter, current_pos)
+      next_lf = read_buffer.data.find(delimiter, current_pos)
 
       if next_lf >= 0:
-        if self._delimiter == b'\n' and read_buffer.data[next_lf -
-                                                         1:next_lf] == b'\r':
-          # Found a '\r\n'. Accepting that as the next separator.
+        if self._delimiter is None \
+                and read_buffer.data[next_lf - 1:next_lf] == b'\r':
+          # Found a '\r\n' if delimiter is not define.
+          # Accepting that as the next separator.
           return (next_lf - 1, next_lf + 1)
         else:
+          # User defined delimiter

Review comment:
       default delimiter can also hit this branch. i suggest to remove these comment lines.

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -262,33 +267,27 @@ def _find_separator_bounds(self, file_to_read, read_buffer):
       next_lf = read_buffer.data.find(delimiter, current_pos)
 
       if next_lf >= 0:
-        if self._delimiter is None and delimiter == b'\n' \
+        if self._delimiter is None \
                 and read_buffer.data[next_lf - 1:next_lf] == b'\r':
-          # Default delimiter
+          # Default b'\n' or user defined delimiter
           # Found a '\r\n'. Accepting that as the next separator.
           return (next_lf - 1, next_lf + 1)
         else:
           # User defined delimiter
           # Found a delimiter. Accepting that as the next separator.
           return (next_lf, next_lf + delimiter_len)
 
-      elif read_buffer.data.find(delimiter[0], current_pos) >= 0:
-        # Corner case: delimiter truncated at the end of the file
-        current_delimiter_pos = read_buffer.data.find(delimiter[0], current_pos)
-
-        i = 0
-        while i < len(delimiter) and read_buffer.data[current_delimiter_pos +
-                                                      i] == delimiter[i]:
-          i += 1
-          if not self._try_to_ensure_num_bytes_in_buffer(
-              file_to_read, read_buffer, current_delimiter_pos + i + 1):
-            break
-
-        if i == delimiter_len:
-          # All bytes of delimiter found
-          return current_delimiter_pos, current_delimiter_pos + delimiter_len
-
-        current_pos += i
+      elif self._delimiter is not None:
+        # Corner case: custom delimiter is truncated at the end of the buffer.
+        next_lf = read_buffer.data.find(

Review comment:
       thanks a lot @nikie, @dmitriikuzinepam for bringing up this corner case and bringing and all the due diligence.
   
   What happens if the buffer boundary happens in the middle of \r\n ? Does a stray '\r' by chance get appended to the input?

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -257,17 +272,31 @@ def _find_separator_bounds(self, file_to_read, read_buffer):
 
       # Using find() here is more efficient than a linear scan
       # of the byte array.
-      next_lf = read_buffer.data.find(self._delimiter, current_pos)
+      next_lf = read_buffer.data.find(delimiter, current_pos)
 
       if next_lf >= 0:
-        if self._delimiter == b'\n' and read_buffer.data[next_lf -
-                                                         1:next_lf] == b'\r':
-          # Found a '\r\n'. Accepting that as the next separator.
+        if self._delimiter is None \
+                and read_buffer.data[next_lf - 1:next_lf] == b'\r':
+          # Found a '\r\n' if delimiter is not define.
+          # Accepting that as the next separator.
           return (next_lf - 1, next_lf + 1)
         else:
+          # User defined delimiter
           # Found a delimiter. Accepting that as the next separator.
           return (next_lf, next_lf + delimiter_len)
 
+      elif self._delimiter is not None:
+        # Corner case: custom delimiter is truncated at the end of the buffer.
+        next_lf = read_buffer.data.find(
+            delimiter[0], len(read_buffer.data) - delimiter_len + 1)
+        if next_lf >= 0:
+          # The first possible start of a truncated delimiter,

Review comment:
       Do you mean smth like:
   
             # Delimiters longer than 1 byte may cross the buffer boundary.
             # Defer full matching till the next iteration.

##########
File path: sdks/python/apache_beam/io/textio_test.py
##########
@@ -1023,13 +1031,75 @@ def test_read_after_splitting_skip_header(self):
     self.assertEqual(expected_data[2:], reference_lines)
     self.assertEqual(reference_lines, split_lines)
 
+  def test_custom_delimiter_read_from_text(self):
+    file_name, expected_data = write_data(
+      5, eol=EOL.CUSTOM_DELIMITER, custom_delimiter=b'@#')
+    assert len(expected_data) == 5
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'Read' >> ReadFromText(file_name, delimiter=b'@#')
+      assert_that(pcoll, equal_to(expected_data))
+
+  def test_custom_delimiter_read_all_single_file(self):
+    file_name, expected_data = write_data(
+      5, eol=EOL.CUSTOM_DELIMITER, custom_delimiter=b'@#')
+    assert len(expected_data) == 5
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'Create' >> Create(
+          [file_name]) | 'ReadAll' >> ReadAllFromText(delimiter=b'@#')
+      assert_that(pcoll, equal_to(expected_data))
+
+  def test_custom_delimiter_must_not_empty_bytes(self):
+    file_name, _ = write_data(1)
+    for delimiter in (b'', '', '\r\n', 'a', 1):
+      with self.assertRaises(
+          ValueError, msg='Delimiter must be a non-empty bytes sequence.'):
+        _ = TextSource(
+            file_pattern=file_name,
+            min_bundle_size=0,
+            buffer_size=6,
+            compression_type=CompressionTypes.UNCOMPRESSED,
+            strip_trailing_newlines=True,
+            coder=coders.StrUtf8Coder(),
+            delimiter=delimiter,
+        )
+
+  def test_custom_delimiter_must_not_self_overlap_ok(self):

Review comment:
       it's also easy to understand the test when the name includes the gist of the test scenario and expected result.

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -113,6 +113,9 @@ def __init__(self,
         `header_matcher` are both provided, the value of `skip_header_lines`
         lines will be skipped and the header will be processed from
         there.
+      delimiter (bytes) Optional: delimiter to split records.
+        Must not self-overlap, because self-overlapping delimiters cause
+        ambiguous parsing at the edge of bundles.

Review comment:
       (for brevity and to avoid introducing additional terms). ditto below in the user-facing portion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org