You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/23 17:20:47 UTC

[GitHub] [arrow] n3world opened a new pull request #10794: ARROW-13441: [CSV] Skip empty batches in column decoder

n3world opened a new pull request #10794:
URL: https://github.com/apache/arrow/pull/10794


   When the infering column decoder encounters an empty batch just return an empty array immediatly and do not consider the batch as the first batch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#issuecomment-886812143


   > For the first block it happens at StreamingReaderImpl::InitAfterFirstBatch:920-925. Looking again at this the schema is already captured so that is an issue.
   
   That was my impression as well. 
   
   In any case, it would be nice to add an explicit StreamingReader test on the C++ side for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#issuecomment-887069211


   > I think the cleanest might be to filter empty parsed blocks when looking for the first batch. This should be safe (as long as you pass in rb_gen and not filtered_rb_gen to use after you got the first batch):
   
   I did something along these lines in my last update which essentially recursively called InitAfterFirstBatch until either the RecordBatch was null or it had rows. I could probably update that approach to use this new filter if you would prefer that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10794:
URL: https://github.com/apache/arrow/pull/10794


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10794:
URL: https://github.com/apache/arrow/pull/10794


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r698845320



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -321,156 +322,31 @@ def test_write_options():
         opts.validate()
 
 
-class BaseTestCSV:
-    """Common tests which are shared by streaming and non streaming readers"""
-
-    def base_row_number_offset_in_errors(self, use_threads, read_bytes,
-                                         num_blocks=3):
-        """
-        num_blocks is a temporary work around because streaming reader does
-        not get schema from first non empty block
-        """
-
-        # Row numbers are only correctly counted in serial reads
-        def format_msg(msg_format, row, *args):
-            if use_threads:
-                row_info = ""
-            else:
-                row_info = "Row #{}: ".format(row)
-            return msg_format.format(row_info, *args)
-
-        csv, _ = make_random_csv(4, 100, write_names=True)
-
-        read_options = ReadOptions()
-        read_options.block_size = len(csv) / num_blocks
-        convert_options = ConvertOptions()
-        convert_options.column_types = {"a": pa.int32()}
-
-        # Test without skip_rows and column names in the csv
-        csv_bad_columns = csv + b"1,2\r\n"
-        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
-        with pytest.raises(pa.ArrowInvalid, match=message_columns):
-            read_bytes(csv_bad_columns, read_options=read_options,
-                       convert_options=convert_options)
-
-        csv_bad_type = csv + b"a,b,c,d\r\n"
-        message_value = format_msg(
-            "In CSV column #0: {}"
-            "CSV conversion error to int32: invalid value 'a'",
-            102, csv)
-        with pytest.raises(pa.ArrowInvalid, match=message_value):
-            read_bytes(csv_bad_type, read_options=read_options,
-                       convert_options=convert_options)
-
-        long_row = (b"this is a long row" * 15) + b",3\r\n"
-        csv_bad_columns_long = csv + long_row
-        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
-                                  long_row[0:96].decode("utf-8"))
-        with pytest.raises(pa.ArrowInvalid, match=message_long):
-            read_bytes(csv_bad_columns_long, read_options=read_options,
-                       convert_options=convert_options)
+@pytest.fixture(params=[True, False], ids=['threaded', 'serial'])
+def use_threads(request):
+    return request.param

Review comment:
       @pitrou It's fine to insist.  I feel somewhat responsible for introducing the schism in the first place so I've submitted a PR @n3world that should go back to the old subclassing scheme: https://github.com/n3world/arrow/pull/2




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r696652788



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -609,28 +489,184 @@ def test_skip_rows_after_names(self):
             assert (values[opts.skip_rows + opts.skip_rows_after_names:] ==
                     table_dict[name])
 
-    def test_header_column_names(self):
+    def test_row_number_offset_in_errors(self, use_threads):
+        # Row numbers are only correctly counted in serial reads
+        def format_msg(msg_format, row, *args):
+            if use_threads:
+                row_info = ""
+            else:
+                row_info = "Row #{}: ".format(row)
+            return msg_format.format(row_info, *args)
+
+        csv, _ = make_random_csv(4, 100, write_names=True)
+
+        read_options = ReadOptions()
+        read_options.block_size = len(csv) / 3
+        convert_options = ConvertOptions()
+        convert_options.column_types = {"a": pa.int32()}
+
+        # Test without skip_rows and column names in the csv
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            102, csv)
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        long_row = (b"this is a long row" * 15) + b",3\r\n"
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test skipping rows after the names
+        read_options.skip_rows_after_names = 47
+
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        read_options.skip_rows_after_names = 0
+
+        # Test without skip_rows and column names not in the csv
+        csv, _ = make_random_csv(4, 100, write_names=False)
+        read_options.column_names = ["a", "b", "c", "d"]
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 101)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 101,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            101)
+        message_value = message_value.format(len(csv))
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test with skip_rows and column names not in the csv
+        read_options.skip_rows = 23
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+
+@pytest.mark.parametrize('use_threads', [False, True])

Review comment:
       Thanks for doing that. I merged in your fix and updated the c++ unit tests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#issuecomment-887029318


   Yes, I removed `SerialStreamingReader::ReadNextSkippingEmpty` semi-intentionally as I thought there could only be one skipped batch.  However, I hadn't realized that the right combinations of skip settings could yield multiple empty parsed blocks so that logic doesn't work.  Filtering in parallel is tricky and rather than solve that I think we can get away with filtering on one of the serial spots.
   
   Here is a commit that adds a filtering utility to the async generators: https://github.com/westonpace/arrow/commit/8d80722a8b8cd304dff8322e96d76dfd75899ea7
   
   I think the cleanest might be to filter empty parsed blocks when looking for the first batch.  This should be safe (as long as you pass in rb_gen and not filtered_rb_gen to use after you got the first batch):
   ```
       auto filtered_rb_gen = MakeFilteredGenerator(rb_gen, std::move(not_empty));
       return filtered_rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) {
         return self->InitAfterFirstBatch(first_block, std::move(rb_gen), max_readahead);
       });
   ```
   Or we could filter empty batches on the other side of the readahead generator.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#issuecomment-886756489


   > Yes that will because the type is unknown, yet. This test seems artificial in that it doesn't follow how the column decoder is actually used. In use all empty record batches get discarded so their type don't actually matter.
   
   Ah, I see. Can you point me where this happens exactly, though?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r699493243



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -321,156 +322,31 @@ def test_write_options():
         opts.validate()
 
 
-class BaseTestCSV:
-    """Common tests which are shared by streaming and non streaming readers"""
-
-    def base_row_number_offset_in_errors(self, use_threads, read_bytes,
-                                         num_blocks=3):
-        """
-        num_blocks is a temporary work around because streaming reader does
-        not get schema from first non empty block
-        """
-
-        # Row numbers are only correctly counted in serial reads
-        def format_msg(msg_format, row, *args):
-            if use_threads:
-                row_info = ""
-            else:
-                row_info = "Row #{}: ".format(row)
-            return msg_format.format(row_info, *args)
-
-        csv, _ = make_random_csv(4, 100, write_names=True)
-
-        read_options = ReadOptions()
-        read_options.block_size = len(csv) / num_blocks
-        convert_options = ConvertOptions()
-        convert_options.column_types = {"a": pa.int32()}
-
-        # Test without skip_rows and column names in the csv
-        csv_bad_columns = csv + b"1,2\r\n"
-        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
-        with pytest.raises(pa.ArrowInvalid, match=message_columns):
-            read_bytes(csv_bad_columns, read_options=read_options,
-                       convert_options=convert_options)
-
-        csv_bad_type = csv + b"a,b,c,d\r\n"
-        message_value = format_msg(
-            "In CSV column #0: {}"
-            "CSV conversion error to int32: invalid value 'a'",
-            102, csv)
-        with pytest.raises(pa.ArrowInvalid, match=message_value):
-            read_bytes(csv_bad_type, read_options=read_options,
-                       convert_options=convert_options)
-
-        long_row = (b"this is a long row" * 15) + b",3\r\n"
-        csv_bad_columns_long = csv + long_row
-        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
-                                  long_row[0:96].decode("utf-8"))
-        with pytest.raises(pa.ArrowInvalid, match=message_long):
-            read_bytes(csv_bad_columns_long, read_options=read_options,
-                       convert_options=convert_options)
+@pytest.fixture(params=[True, False], ids=['threaded', 'serial'])
+def use_threads(request):
+    return request.param

Review comment:
       Thank you very much @n3world !

##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -332,11 +332,13 @@ def read_bytes(self, b, **kwargs):
         :param kwargs: arguments passed on to open the csv file
         :return: b parsed as a single RecordBatch
         """
+        raise NotImplementedError
 
     @property
     @abc.abstractmethod
     def use_threads(self):
-        """Return true if this test is multi-threaded"""
+        """Whether this test is multi-threaded"""
+        raise NotImplementedError

Review comment:
       Hmm, you're right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r699458154



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -321,156 +322,31 @@ def test_write_options():
         opts.validate()
 
 
-class BaseTestCSV:
-    """Common tests which are shared by streaming and non streaming readers"""
-
-    def base_row_number_offset_in_errors(self, use_threads, read_bytes,
-                                         num_blocks=3):
-        """
-        num_blocks is a temporary work around because streaming reader does
-        not get schema from first non empty block
-        """
-
-        # Row numbers are only correctly counted in serial reads
-        def format_msg(msg_format, row, *args):
-            if use_threads:
-                row_info = ""
-            else:
-                row_info = "Row #{}: ".format(row)
-            return msg_format.format(row_info, *args)
-
-        csv, _ = make_random_csv(4, 100, write_names=True)
-
-        read_options = ReadOptions()
-        read_options.block_size = len(csv) / num_blocks
-        convert_options = ConvertOptions()
-        convert_options.column_types = {"a": pa.int32()}
-
-        # Test without skip_rows and column names in the csv
-        csv_bad_columns = csv + b"1,2\r\n"
-        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
-        with pytest.raises(pa.ArrowInvalid, match=message_columns):
-            read_bytes(csv_bad_columns, read_options=read_options,
-                       convert_options=convert_options)
-
-        csv_bad_type = csv + b"a,b,c,d\r\n"
-        message_value = format_msg(
-            "In CSV column #0: {}"
-            "CSV conversion error to int32: invalid value 'a'",
-            102, csv)
-        with pytest.raises(pa.ArrowInvalid, match=message_value):
-            read_bytes(csv_bad_type, read_options=read_options,
-                       convert_options=convert_options)
-
-        long_row = (b"this is a long row" * 15) + b",3\r\n"
-        csv_bad_columns_long = csv + long_row
-        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
-                                  long_row[0:96].decode("utf-8"))
-        with pytest.raises(pa.ArrowInvalid, match=message_long):
-            read_bytes(csv_bad_columns_long, read_options=read_options,
-                       convert_options=convert_options)
+@pytest.fixture(params=[True, False], ids=['threaded', 'serial'])
+def use_threads(request):
+    return request.param

Review comment:
       @pitrou, I added @westonpace change and simplified it a bit so that the only method actually in the test classes is the use_threads property method.

##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -332,11 +332,13 @@ def read_bytes(self, b, **kwargs):
         :param kwargs: arguments passed on to open the csv file
         :return: b parsed as a single RecordBatch
         """
+        raise NotImplementedError
 
     @property
     @abc.abstractmethod
     def use_threads(self):
-        """Return true if this test is multi-threaded"""
+        """Whether this test is multi-threaded"""
+        raise NotImplementedError

Review comment:
       The NotImplementedError shouldn't be needed since the class cannot be created if an abstract method is not implemented

##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -332,11 +332,13 @@ def read_bytes(self, b, **kwargs):
         :param kwargs: arguments passed on to open the csv file
         :return: b parsed as a single RecordBatch
         """
+        raise NotImplementedError
 
     @property
     @abc.abstractmethod
     def use_threads(self):
-        """Return true if this test is multi-threaded"""
+        """Whether this test is multi-threaded"""
+        raise NotImplementedError

Review comment:
       I just realized there is an `abstractproperty` decorator which I should have used here. Guess I'll fix that in my next change someday




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r698474140



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -321,156 +322,31 @@ def test_write_options():
         opts.validate()
 
 
-class BaseTestCSV:
-    """Common tests which are shared by streaming and non streaming readers"""
-
-    def base_row_number_offset_in_errors(self, use_threads, read_bytes,
-                                         num_blocks=3):
-        """
-        num_blocks is a temporary work around because streaming reader does
-        not get schema from first non empty block
-        """
-
-        # Row numbers are only correctly counted in serial reads
-        def format_msg(msg_format, row, *args):
-            if use_threads:
-                row_info = ""
-            else:
-                row_info = "Row #{}: ".format(row)
-            return msg_format.format(row_info, *args)
-
-        csv, _ = make_random_csv(4, 100, write_names=True)
-
-        read_options = ReadOptions()
-        read_options.block_size = len(csv) / num_blocks
-        convert_options = ConvertOptions()
-        convert_options.column_types = {"a": pa.int32()}
-
-        # Test without skip_rows and column names in the csv
-        csv_bad_columns = csv + b"1,2\r\n"
-        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
-        with pytest.raises(pa.ArrowInvalid, match=message_columns):
-            read_bytes(csv_bad_columns, read_options=read_options,
-                       convert_options=convert_options)
-
-        csv_bad_type = csv + b"a,b,c,d\r\n"
-        message_value = format_msg(
-            "In CSV column #0: {}"
-            "CSV conversion error to int32: invalid value 'a'",
-            102, csv)
-        with pytest.raises(pa.ArrowInvalid, match=message_value):
-            read_bytes(csv_bad_type, read_options=read_options,
-                       convert_options=convert_options)
-
-        long_row = (b"this is a long row" * 15) + b",3\r\n"
-        csv_bad_columns_long = csv + long_row
-        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
-                                  long_row[0:96].decode("utf-8"))
-        with pytest.raises(pa.ArrowInvalid, match=message_long):
-            read_bytes(csv_bad_columns_long, read_options=read_options,
-                       convert_options=convert_options)
+@pytest.fixture(params=[True, False], ids=['threaded', 'serial'])
+def use_threads(request):
+    return request.param

Review comment:
       Ah. I'm sorry to insist, but I'd prefer that we avoid this kind of obscure feature/hack. Surely there is a way to make these additional tests fit with the original subclassing scheme?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r699458154



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -321,156 +322,31 @@ def test_write_options():
         opts.validate()
 
 
-class BaseTestCSV:
-    """Common tests which are shared by streaming and non streaming readers"""
-
-    def base_row_number_offset_in_errors(self, use_threads, read_bytes,
-                                         num_blocks=3):
-        """
-        num_blocks is a temporary work around because streaming reader does
-        not get schema from first non empty block
-        """
-
-        # Row numbers are only correctly counted in serial reads
-        def format_msg(msg_format, row, *args):
-            if use_threads:
-                row_info = ""
-            else:
-                row_info = "Row #{}: ".format(row)
-            return msg_format.format(row_info, *args)
-
-        csv, _ = make_random_csv(4, 100, write_names=True)
-
-        read_options = ReadOptions()
-        read_options.block_size = len(csv) / num_blocks
-        convert_options = ConvertOptions()
-        convert_options.column_types = {"a": pa.int32()}
-
-        # Test without skip_rows and column names in the csv
-        csv_bad_columns = csv + b"1,2\r\n"
-        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
-        with pytest.raises(pa.ArrowInvalid, match=message_columns):
-            read_bytes(csv_bad_columns, read_options=read_options,
-                       convert_options=convert_options)
-
-        csv_bad_type = csv + b"a,b,c,d\r\n"
-        message_value = format_msg(
-            "In CSV column #0: {}"
-            "CSV conversion error to int32: invalid value 'a'",
-            102, csv)
-        with pytest.raises(pa.ArrowInvalid, match=message_value):
-            read_bytes(csv_bad_type, read_options=read_options,
-                       convert_options=convert_options)
-
-        long_row = (b"this is a long row" * 15) + b",3\r\n"
-        csv_bad_columns_long = csv + long_row
-        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
-                                  long_row[0:96].decode("utf-8"))
-        with pytest.raises(pa.ArrowInvalid, match=message_long):
-            read_bytes(csv_bad_columns_long, read_options=read_options,
-                       convert_options=convert_options)
+@pytest.fixture(params=[True, False], ids=['threaded', 'serial'])
+def use_threads(request):
+    return request.param

Review comment:
       @pitrou, I added @westonpace change and simplified it a bit so that the only method actually in the test classes is the use_threads property method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r699609897



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -332,11 +332,13 @@ def read_bytes(self, b, **kwargs):
         :param kwargs: arguments passed on to open the csv file
         :return: b parsed as a single RecordBatch
         """
+        raise NotImplementedError
 
     @property
     @abc.abstractmethod
     def use_threads(self):
-        """Return true if this test is multi-threaded"""
+        """Whether this test is multi-threaded"""
+        raise NotImplementedError

Review comment:
       The NotImplementedError shouldn't be needed since the class cannot be created if an abstract method is not implemented




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r677556221



##########
File path: cpp/src/arrow/csv/column_decoder.cc
##########
@@ -198,6 +198,11 @@ Result<std::shared_ptr<Array>> InferringColumnDecoder::RunInference(
 
 Future<std::shared_ptr<Array>> InferringColumnDecoder::Decode(
     const std::shared_ptr<BlockParser>& parser) {
+  if (parser->num_rows() == 0) {

Review comment:
       I think I'll keep empty array because as you point out nullptr usually indicates EOF and to avoid any dereference issues from the downstream converters which assume no nulls.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r699493243



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -321,156 +322,31 @@ def test_write_options():
         opts.validate()
 
 
-class BaseTestCSV:
-    """Common tests which are shared by streaming and non streaming readers"""
-
-    def base_row_number_offset_in_errors(self, use_threads, read_bytes,
-                                         num_blocks=3):
-        """
-        num_blocks is a temporary work around because streaming reader does
-        not get schema from first non empty block
-        """
-
-        # Row numbers are only correctly counted in serial reads
-        def format_msg(msg_format, row, *args):
-            if use_threads:
-                row_info = ""
-            else:
-                row_info = "Row #{}: ".format(row)
-            return msg_format.format(row_info, *args)
-
-        csv, _ = make_random_csv(4, 100, write_names=True)
-
-        read_options = ReadOptions()
-        read_options.block_size = len(csv) / num_blocks
-        convert_options = ConvertOptions()
-        convert_options.column_types = {"a": pa.int32()}
-
-        # Test without skip_rows and column names in the csv
-        csv_bad_columns = csv + b"1,2\r\n"
-        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
-        with pytest.raises(pa.ArrowInvalid, match=message_columns):
-            read_bytes(csv_bad_columns, read_options=read_options,
-                       convert_options=convert_options)
-
-        csv_bad_type = csv + b"a,b,c,d\r\n"
-        message_value = format_msg(
-            "In CSV column #0: {}"
-            "CSV conversion error to int32: invalid value 'a'",
-            102, csv)
-        with pytest.raises(pa.ArrowInvalid, match=message_value):
-            read_bytes(csv_bad_type, read_options=read_options,
-                       convert_options=convert_options)
-
-        long_row = (b"this is a long row" * 15) + b",3\r\n"
-        csv_bad_columns_long = csv + long_row
-        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
-                                  long_row[0:96].decode("utf-8"))
-        with pytest.raises(pa.ArrowInvalid, match=message_long):
-            read_bytes(csv_bad_columns_long, read_options=read_options,
-                       convert_options=convert_options)
+@pytest.fixture(params=[True, False], ids=['threaded', 'serial'])
+def use_threads(request):
+    return request.param

Review comment:
       Thank you very much @n3world !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r699615244



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -332,11 +332,13 @@ def read_bytes(self, b, **kwargs):
         :param kwargs: arguments passed on to open the csv file
         :return: b parsed as a single RecordBatch
         """
+        raise NotImplementedError
 
     @property
     @abc.abstractmethod
     def use_threads(self):
-        """Return true if this test is multi-threaded"""
+        """Whether this test is multi-threaded"""
+        raise NotImplementedError

Review comment:
       I just realized there is an `abstractproperty` decorator which I should have used here. Guess I'll fix that in my next change someday




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace edited a comment on pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
westonpace edited a comment on pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#issuecomment-887029318


   Yes, I removed `SerialStreamingReader::ReadNextSkippingEmpty` semi-intentionally as I thought there could only be one skipped batch.  However, I hadn't realized that the right combinations of skip settings could yield multiple empty parsed blocks so that logic doesn't work.  Filtering in parallel is tricky and rather than solve that I think we can get away with filtering on one of the serial spots.
   
   Here is a commit that adds a filtering utility to the async generators: https://github.com/westonpace/arrow/commit/8d80722a8b8cd304dff8322e96d76dfd75899ea7
   
   I think the cleanest might be to filter empty parsed blocks when looking for the first batch.  This should be safe (as long as you pass in rb_gen and not filtered_rb_gen to use after you got the first batch):
   ```
       auto filtered_rb_gen = MakeFilteredGenerator(rb_gen, std::move(not_empty));
       return filtered_rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) {
         return self->InitAfterFirstBatch(first_block, std::move(rb_gen), max_readahead);
       });
   ```
   Or we could filter empty batches on the other side of the readahead generator but then you'd have to figure out the schema problem.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#issuecomment-886799871


   > > Yes that will because the type is unknown, yet. This test seems artificial in that it doesn't follow how the column decoder is actually used. In use all empty record batches get discarded so their type don't actually matter.
   > 
   > Ah, I see. Can you point me where this happens exactly, though?
   
   For the first block it happens at `StreamingReaderImpl::InitAfterFirstBatch:920-925`. Looking again at this the schema is already captured so that is an issue.
   
   I don't seem to be able to find where it happens other than the first block. I know it use to be in the previous streaming reader, SerialStreamingReader::ReadNextSkippingEmpty. @westonpace was this an intentional change or an accident? Either way this does need a little more work in the csv reader to be able to handle consuming the empty leading blocks and capturing the schema after the first one with data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r699618474



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -332,11 +332,13 @@ def read_bytes(self, b, **kwargs):
         :param kwargs: arguments passed on to open the csv file
         :return: b parsed as a single RecordBatch
         """
+        raise NotImplementedError
 
     @property
     @abc.abstractmethod
     def use_threads(self):
-        """Return true if this test is multi-threaded"""
+        """Whether this test is multi-threaded"""
+        raise NotImplementedError

Review comment:
       Hmm, you're right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r695665572



##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -358,6 +358,44 @@ TEST(StreamingReaderTest, BytesRead) {
   }
 }
 
+TEST(StreamingReaderTests, SkipMultipleEmptyBlocksAtStart) {
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
+  auto table_buffer = std::make_shared<Buffer>(
+      "aaa,bbb,ccc\n123,456,789\n101,112,131\n415,161,718\n192,021,222\n324,252,627\n"
+      "282,930,313\n233,343,536\n");
+
+  auto input = std::make_shared<io::BufferReader>(table_buffer);
+
+  auto read_options = ReadOptions::Defaults();
+  read_options.block_size = 34;
+  read_options.skip_rows_after_names = 6;
+
+  ASSERT_OK_AND_ASSIGN(
+      auto streaming_reader,
+      StreamingReader::Make(io::default_io_context(), input, read_options,
+                            ParseOptions::Defaults(), ConvertOptions::Defaults()));
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_EQ(12, streaming_reader->bytes_read());
+
+  // The first batch should have the one and only row in it
+  ASSERT_OK(streaming_reader->ReadNext(&batch));
+  ASSERT_NE(nullptr, batch.get());
+  ASSERT_EQ(1, batch->num_rows());
+  ASSERT_EQ(96, streaming_reader->bytes_read());
+
+  auto schema = streaming_reader->schema();
+  ASSERT_EQ(3, schema->num_fields());
+  ASSERT_EQ("aaa", schema->field(0)->name());
+  ASSERT_EQ(Type::INT64, schema->field(0)->type()->id());
+  ASSERT_EQ("bbb", schema->field(1)->name());
+  ASSERT_EQ(Type::INT64, schema->field(1)->type()->id());
+  ASSERT_EQ("ccc", schema->field(2)->name());
+  ASSERT_EQ(Type::INT64, schema->field(2)->type()->id());
+

Review comment:
       Can you also test the contents of `batch`? You can use `RecordBatchFromJSON` to construct the expected record batch.

##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -358,6 +358,44 @@ TEST(StreamingReaderTest, BytesRead) {
   }
 }
 
+TEST(StreamingReaderTests, SkipMultipleEmptyBlocksAtStart) {
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
+  auto table_buffer = std::make_shared<Buffer>(
+      "aaa,bbb,ccc\n123,456,789\n101,112,131\n415,161,718\n192,021,222\n324,252,627\n"
+      "282,930,313\n233,343,536\n");
+
+  auto input = std::make_shared<io::BufferReader>(table_buffer);
+
+  auto read_options = ReadOptions::Defaults();
+  read_options.block_size = 34;
+  read_options.skip_rows_after_names = 6;
+
+  ASSERT_OK_AND_ASSIGN(
+      auto streaming_reader,
+      StreamingReader::Make(io::default_io_context(), input, read_options,
+                            ParseOptions::Defaults(), ConvertOptions::Defaults()));
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_EQ(12, streaming_reader->bytes_read());
+
+  // The first batch should have the one and only row in it
+  ASSERT_OK(streaming_reader->ReadNext(&batch));
+  ASSERT_NE(nullptr, batch.get());
+  ASSERT_EQ(1, batch->num_rows());
+  ASSERT_EQ(96, streaming_reader->bytes_read());
+
+  auto schema = streaming_reader->schema();
+  ASSERT_EQ(3, schema->num_fields());
+  ASSERT_EQ("aaa", schema->field(0)->name());
+  ASSERT_EQ(Type::INT64, schema->field(0)->type()->id());
+  ASSERT_EQ("bbb", schema->field(1)->name());
+  ASSERT_EQ(Type::INT64, schema->field(1)->type()->id());
+  ASSERT_EQ("ccc", schema->field(2)->name());
+  ASSERT_EQ(Type::INT64, schema->field(2)->type()->id());

Review comment:
       You can construct an expected schema and call `AssertSchemaEqual`.

##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -609,28 +489,184 @@ def test_skip_rows_after_names(self):
             assert (values[opts.skip_rows + opts.skip_rows_after_names:] ==
                     table_dict[name])
 
-    def test_header_column_names(self):
+    def test_row_number_offset_in_errors(self, use_threads):
+        # Row numbers are only correctly counted in serial reads
+        def format_msg(msg_format, row, *args):
+            if use_threads:
+                row_info = ""
+            else:
+                row_info = "Row #{}: ".format(row)
+            return msg_format.format(row_info, *args)
+
+        csv, _ = make_random_csv(4, 100, write_names=True)
+
+        read_options = ReadOptions()
+        read_options.block_size = len(csv) / 3
+        convert_options = ConvertOptions()
+        convert_options.column_types = {"a": pa.int32()}
+
+        # Test without skip_rows and column names in the csv
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            102, csv)
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        long_row = (b"this is a long row" * 15) + b",3\r\n"
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test skipping rows after the names
+        read_options.skip_rows_after_names = 47
+
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        read_options.skip_rows_after_names = 0
+
+        # Test without skip_rows and column names not in the csv
+        csv, _ = make_random_csv(4, 100, write_names=False)
+        read_options.column_names = ["a", "b", "c", "d"]
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 101)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 101,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            101)
+        message_value = message_value.format(len(csv))
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test with skip_rows and column names not in the csv
+        read_options.skip_rows = 23
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+
+@pytest.mark.parametrize('use_threads', [False, True])

Review comment:
       Not fond of this reorganization at all. Can't your test fit in the existing structure?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10794: ARROW-13441: [CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#issuecomment-885785604


   https://issues.apache.org/jira/browse/ARROW-13441


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r698474140



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -321,156 +322,31 @@ def test_write_options():
         opts.validate()
 
 
-class BaseTestCSV:
-    """Common tests which are shared by streaming and non streaming readers"""
-
-    def base_row_number_offset_in_errors(self, use_threads, read_bytes,
-                                         num_blocks=3):
-        """
-        num_blocks is a temporary work around because streaming reader does
-        not get schema from first non empty block
-        """
-
-        # Row numbers are only correctly counted in serial reads
-        def format_msg(msg_format, row, *args):
-            if use_threads:
-                row_info = ""
-            else:
-                row_info = "Row #{}: ".format(row)
-            return msg_format.format(row_info, *args)
-
-        csv, _ = make_random_csv(4, 100, write_names=True)
-
-        read_options = ReadOptions()
-        read_options.block_size = len(csv) / num_blocks
-        convert_options = ConvertOptions()
-        convert_options.column_types = {"a": pa.int32()}
-
-        # Test without skip_rows and column names in the csv
-        csv_bad_columns = csv + b"1,2\r\n"
-        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
-        with pytest.raises(pa.ArrowInvalid, match=message_columns):
-            read_bytes(csv_bad_columns, read_options=read_options,
-                       convert_options=convert_options)
-
-        csv_bad_type = csv + b"a,b,c,d\r\n"
-        message_value = format_msg(
-            "In CSV column #0: {}"
-            "CSV conversion error to int32: invalid value 'a'",
-            102, csv)
-        with pytest.raises(pa.ArrowInvalid, match=message_value):
-            read_bytes(csv_bad_type, read_options=read_options,
-                       convert_options=convert_options)
-
-        long_row = (b"this is a long row" * 15) + b",3\r\n"
-        csv_bad_columns_long = csv + long_row
-        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
-                                  long_row[0:96].decode("utf-8"))
-        with pytest.raises(pa.ArrowInvalid, match=message_long):
-            read_bytes(csv_bad_columns_long, read_options=read_options,
-                       convert_options=convert_options)
+@pytest.fixture(params=[True, False], ids=['threaded', 'serial'])
+def use_threads(request):
+    return request.param

Review comment:
       Ah. I'm sorry to insist, but I'd prefer that we avoid this kind of obscure feature/hack. Surely there is a way to make these additional tests fit with the original subclassing scheme?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r698845320



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -321,156 +322,31 @@ def test_write_options():
         opts.validate()
 
 
-class BaseTestCSV:
-    """Common tests which are shared by streaming and non streaming readers"""
-
-    def base_row_number_offset_in_errors(self, use_threads, read_bytes,
-                                         num_blocks=3):
-        """
-        num_blocks is a temporary work around because streaming reader does
-        not get schema from first non empty block
-        """
-
-        # Row numbers are only correctly counted in serial reads
-        def format_msg(msg_format, row, *args):
-            if use_threads:
-                row_info = ""
-            else:
-                row_info = "Row #{}: ".format(row)
-            return msg_format.format(row_info, *args)
-
-        csv, _ = make_random_csv(4, 100, write_names=True)
-
-        read_options = ReadOptions()
-        read_options.block_size = len(csv) / num_blocks
-        convert_options = ConvertOptions()
-        convert_options.column_types = {"a": pa.int32()}
-
-        # Test without skip_rows and column names in the csv
-        csv_bad_columns = csv + b"1,2\r\n"
-        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
-        with pytest.raises(pa.ArrowInvalid, match=message_columns):
-            read_bytes(csv_bad_columns, read_options=read_options,
-                       convert_options=convert_options)
-
-        csv_bad_type = csv + b"a,b,c,d\r\n"
-        message_value = format_msg(
-            "In CSV column #0: {}"
-            "CSV conversion error to int32: invalid value 'a'",
-            102, csv)
-        with pytest.raises(pa.ArrowInvalid, match=message_value):
-            read_bytes(csv_bad_type, read_options=read_options,
-                       convert_options=convert_options)
-
-        long_row = (b"this is a long row" * 15) + b",3\r\n"
-        csv_bad_columns_long = csv + long_row
-        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
-                                  long_row[0:96].decode("utf-8"))
-        with pytest.raises(pa.ArrowInvalid, match=message_long):
-            read_bytes(csv_bad_columns_long, read_options=read_options,
-                       convert_options=convert_options)
+@pytest.fixture(params=[True, False], ids=['threaded', 'serial'])
+def use_threads(request):
+    return request.param

Review comment:
       @pitrou It's fine to insist.  I feel somewhat responsible for introducing the schism in the first place so I've submitted a PR @n3world that should go back to the old subclassing scheme: https://github.com/n3world/arrow/pull/2




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r695743700



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -609,28 +489,184 @@ def test_skip_rows_after_names(self):
             assert (values[opts.skip_rows + opts.skip_rows_after_names:] ==
                     table_dict[name])
 
-    def test_header_column_names(self):
+    def test_row_number_offset_in_errors(self, use_threads):
+        # Row numbers are only correctly counted in serial reads
+        def format_msg(msg_format, row, *args):
+            if use_threads:
+                row_info = ""
+            else:
+                row_info = "Row #{}: ".format(row)
+            return msg_format.format(row_info, *args)
+
+        csv, _ = make_random_csv(4, 100, write_names=True)
+
+        read_options = ReadOptions()
+        read_options.block_size = len(csv) / 3
+        convert_options = ConvertOptions()
+        convert_options.column_types = {"a": pa.int32()}
+
+        # Test without skip_rows and column names in the csv
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            102, csv)
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        long_row = (b"this is a long row" * 15) + b",3\r\n"
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test skipping rows after the names
+        read_options.skip_rows_after_names = 47
+
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        read_options.skip_rows_after_names = 0
+
+        # Test without skip_rows and column names not in the csv
+        csv, _ = make_random_csv(4, 100, write_names=False)
+        read_options.column_names = ["a", "b", "c", "d"]
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 101)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 101,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            101)
+        message_value = message_value.format(len(csv))
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test with skip_rows and column names not in the csv
+        read_options.skip_rows = 23
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+
+@pytest.mark.parametrize('use_threads', [False, True])

Review comment:
       Hmm, sorry, that's a bit of an annoyance indeed. Personally, I would rather not have additional `use_threads` arguments in every test function, so I prefer the original subclassing approach.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#issuecomment-905130494


   Might be good to mention @pitrou with the question to make sure it gets noticed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#issuecomment-904809196


   > In any case, it would be nice to add an explicit StreamingReader test on the C++ side for this.
   
   Are the tests I added sufficient?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r696111634



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -609,28 +489,184 @@ def test_skip_rows_after_names(self):
             assert (values[opts.skip_rows + opts.skip_rows_after_names:] ==
                     table_dict[name])
 
-    def test_header_column_names(self):
+    def test_row_number_offset_in_errors(self, use_threads):
+        # Row numbers are only correctly counted in serial reads
+        def format_msg(msg_format, row, *args):
+            if use_threads:
+                row_info = ""
+            else:
+                row_info = "Row #{}: ".format(row)
+            return msg_format.format(row_info, *args)
+
+        csv, _ = make_random_csv(4, 100, write_names=True)
+
+        read_options = ReadOptions()
+        read_options.block_size = len(csv) / 3
+        convert_options = ConvertOptions()
+        convert_options.column_types = {"a": pa.int32()}
+
+        # Test without skip_rows and column names in the csv
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            102, csv)
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        long_row = (b"this is a long row" * 15) + b",3\r\n"
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test skipping rows after the names
+        read_options.skip_rows_after_names = 47
+
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        read_options.skip_rows_after_names = 0
+
+        # Test without skip_rows and column names not in the csv
+        csv, _ = make_random_csv(4, 100, write_names=False)
+        read_options.column_names = ["a", "b", "c", "d"]
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 101)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 101,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            101)
+        message_value = message_value.format(len(csv))
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test with skip_rows and column names not in the csv
+        read_options.skip_rows = 23
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+
+@pytest.mark.parametrize('use_threads', [False, True])

Review comment:
       Sorry for the trouble.  pytest is obtuse but infinitely configurable.  I've pushed a PR to your branch that should let us keep `use_threads` as a proper pytest parameter but avoid repeating it everywhere.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r677115339



##########
File path: cpp/src/arrow/csv/column_decoder.cc
##########
@@ -198,6 +198,11 @@ Result<std::shared_ptr<Array>> InferringColumnDecoder::RunInference(
 
 Future<std::shared_ptr<Array>> InferringColumnDecoder::Decode(
     const std::shared_ptr<BlockParser>& parser) {
+  if (parser->num_rows() == 0) {

Review comment:
       ```suggestion
     // Note: Care should be taken to discard these batches as the data type will be null
     // and won't match future decoded arrays.
     if (parser->num_rows() == 0) {
   ```
   
   Nit: Not necessary but might help explain to future readers.  Optionally you could also return `nullptr` here so it is clear it is invalid but then that will cause some confusion below between EOF and invalidly parsed batch.  I'll defer to whatever you think is best.

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -901,13 +901,31 @@ class StreamingReaderImpl : public ReaderMixin,
 
     auto self = shared_from_this();
     return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) {
-      return self->InitAfterFirstBatch(first_block, std::move(rb_gen), max_readahead);
+      return self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0);
     });
   }
 
-  Status InitAfterFirstBatch(const DecodedBlock& first_block,
-                             AsyncGenerator<DecodedBlock> batch_gen, int max_readahead) {
-    schema_ = first_block.record_batch->schema();
+  Future<> InitFromBlock(const DecodedBlock& block,
+                         AsyncGenerator<DecodedBlock> batch_gen, int max_readahead,
+                         int64_t prev_bytes_processed) {
+    if (!block.record_batch) {
+      // End of file just return null batches
+      record_batch_gen_ = []() { return std::shared_ptr<RecordBatch>(); };

Review comment:
       ```suggestion
         record_batch_gen_ = MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#issuecomment-886728438


   > The problem with this approach is that yields column chunks of differing types. You can see this if you add the following test:
   
   Yes that will because the type is unknown, yet. This test seems artificial in that it doesn't follow how the column decoder is actually used. In use all empty record batches get discarded so their type don't actually matter. That is why it works for the csv streaming test I modified to have multiple empty blocks before a block with data.
   
   That test would not work without this change so the change does not make anything better or worse as far as that test goes. I would argue it makes things better because eventually you can get data from the converter.
   
   The only ways I can think to get that test to work would be to allow the API not to have to return a result until the type is known but that is dependent on the read ahead, for csv, to be high enough to actually find data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r679930283



##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -901,13 +901,31 @@ class StreamingReaderImpl : public ReaderMixin,
 
     auto self = shared_from_this();
     return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) {
-      return self->InitAfterFirstBatch(first_block, std::move(rb_gen), max_readahead);
+      return self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0);
     });
   }
 
-  Status InitAfterFirstBatch(const DecodedBlock& first_block,
-                             AsyncGenerator<DecodedBlock> batch_gen, int max_readahead) {
-    schema_ = first_block.record_batch->schema();
+  Future<> InitFromBlock(const DecodedBlock& block,
+                         AsyncGenerator<DecodedBlock> batch_gen, int max_readahead,
+                         int64_t prev_bytes_processed) {
+    if (!block.record_batch) {
+      // End of file just return null batches
+      record_batch_gen_ = []() { return std::shared_ptr<RecordBatch>(); };

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r695731681



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -609,28 +489,184 @@ def test_skip_rows_after_names(self):
             assert (values[opts.skip_rows + opts.skip_rows_after_names:] ==
                     table_dict[name])
 
-    def test_header_column_names(self):
+    def test_row_number_offset_in_errors(self, use_threads):
+        # Row numbers are only correctly counted in serial reads
+        def format_msg(msg_format, row, *args):
+            if use_threads:
+                row_info = ""
+            else:
+                row_info = "Row #{}: ".format(row)
+            return msg_format.format(row_info, *args)
+
+        csv, _ = make_random_csv(4, 100, write_names=True)
+
+        read_options = ReadOptions()
+        read_options.block_size = len(csv) / 3
+        convert_options = ConvertOptions()
+        convert_options.column_types = {"a": pa.int32()}
+
+        # Test without skip_rows and column names in the csv
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            102, csv)
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        long_row = (b"this is a long row" * 15) + b",3\r\n"
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test skipping rows after the names
+        read_options.skip_rows_after_names = 47
+
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        read_options.skip_rows_after_names = 0
+
+        # Test without skip_rows and column names not in the csv
+        csv, _ = make_random_csv(4, 100, write_names=False)
+        read_options.column_names = ["a", "b", "c", "d"]
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 101)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 101,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            101)
+        message_value = message_value.format(len(csv))
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test with skip_rows and column names not in the csv
+        read_options.skip_rows = 23
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+
+@pytest.mark.parametrize('use_threads', [False, True])

Review comment:
       My goal was to unify the test structure of the table and stream tests so that tests can be written once and executed for both readers. I opted to go to the parameter approach because that was recently done for the streaming tests and assumed since that was more recent it was the more desired style. I can revert it back to the inheritance approach instead but that would mean I would want to undo the parameterization that @westonpace recently did on the streaming tests.
   
   It doesn't matter to me which approach is used but it makes it a lot easier if it is the same style for testing the different readers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r695780581



##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -609,28 +489,184 @@ def test_skip_rows_after_names(self):
             assert (values[opts.skip_rows + opts.skip_rows_after_names:] ==
                     table_dict[name])
 
-    def test_header_column_names(self):
+    def test_row_number_offset_in_errors(self, use_threads):
+        # Row numbers are only correctly counted in serial reads
+        def format_msg(msg_format, row, *args):
+            if use_threads:
+                row_info = ""
+            else:
+                row_info = "Row #{}: ".format(row)
+            return msg_format.format(row_info, *args)
+
+        csv, _ = make_random_csv(4, 100, write_names=True)
+
+        read_options = ReadOptions()
+        read_options.block_size = len(csv) / 3
+        convert_options = ConvertOptions()
+        convert_options.column_types = {"a": pa.int32()}
+
+        # Test without skip_rows and column names in the csv
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            102, csv)
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        long_row = (b"this is a long row" * 15) + b",3\r\n"
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test skipping rows after the names
+        read_options.skip_rows_after_names = 47
+
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        read_options.skip_rows_after_names = 0
+
+        # Test without skip_rows and column names not in the csv
+        csv, _ = make_random_csv(4, 100, write_names=False)
+        read_options.column_names = ["a", "b", "c", "d"]
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 101)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 101,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            101)
+        message_value = message_value.format(len(csv))
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test with skip_rows and column names not in the csv
+        read_options.skip_rows = 23
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+
+@pytest.mark.parametrize('use_threads', [False, True])

Review comment:
       I do agree that the extra `use_threads` argument is a little annoying for every test. I wish the parameter was passed to the class init rather than the test method then it would only be in one spot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org