You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/25 11:39:23 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #10794: ARROW-13441: [C++][CSV] Skip empty batches in column decoder

pitrou commented on a change in pull request #10794:
URL: https://github.com/apache/arrow/pull/10794#discussion_r695665572



##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -358,6 +358,44 @@ TEST(StreamingReaderTest, BytesRead) {
   }
 }
 
+TEST(StreamingReaderTests, SkipMultipleEmptyBlocksAtStart) {
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
+  auto table_buffer = std::make_shared<Buffer>(
+      "aaa,bbb,ccc\n123,456,789\n101,112,131\n415,161,718\n192,021,222\n324,252,627\n"
+      "282,930,313\n233,343,536\n");
+
+  auto input = std::make_shared<io::BufferReader>(table_buffer);
+
+  auto read_options = ReadOptions::Defaults();
+  read_options.block_size = 34;
+  read_options.skip_rows_after_names = 6;
+
+  ASSERT_OK_AND_ASSIGN(
+      auto streaming_reader,
+      StreamingReader::Make(io::default_io_context(), input, read_options,
+                            ParseOptions::Defaults(), ConvertOptions::Defaults()));
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_EQ(12, streaming_reader->bytes_read());
+
+  // The first batch should have the one and only row in it
+  ASSERT_OK(streaming_reader->ReadNext(&batch));
+  ASSERT_NE(nullptr, batch.get());
+  ASSERT_EQ(1, batch->num_rows());
+  ASSERT_EQ(96, streaming_reader->bytes_read());
+
+  auto schema = streaming_reader->schema();
+  ASSERT_EQ(3, schema->num_fields());
+  ASSERT_EQ("aaa", schema->field(0)->name());
+  ASSERT_EQ(Type::INT64, schema->field(0)->type()->id());
+  ASSERT_EQ("bbb", schema->field(1)->name());
+  ASSERT_EQ(Type::INT64, schema->field(1)->type()->id());
+  ASSERT_EQ("ccc", schema->field(2)->name());
+  ASSERT_EQ(Type::INT64, schema->field(2)->type()->id());
+

Review comment:
       Can you also test the contents of `batch`? You can use `RecordBatchFromJSON` to construct the expected record batch.

##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -358,6 +358,44 @@ TEST(StreamingReaderTest, BytesRead) {
   }
 }
 
+TEST(StreamingReaderTests, SkipMultipleEmptyBlocksAtStart) {
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
+  auto table_buffer = std::make_shared<Buffer>(
+      "aaa,bbb,ccc\n123,456,789\n101,112,131\n415,161,718\n192,021,222\n324,252,627\n"
+      "282,930,313\n233,343,536\n");
+
+  auto input = std::make_shared<io::BufferReader>(table_buffer);
+
+  auto read_options = ReadOptions::Defaults();
+  read_options.block_size = 34;
+  read_options.skip_rows_after_names = 6;
+
+  ASSERT_OK_AND_ASSIGN(
+      auto streaming_reader,
+      StreamingReader::Make(io::default_io_context(), input, read_options,
+                            ParseOptions::Defaults(), ConvertOptions::Defaults()));
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_EQ(12, streaming_reader->bytes_read());
+
+  // The first batch should have the one and only row in it
+  ASSERT_OK(streaming_reader->ReadNext(&batch));
+  ASSERT_NE(nullptr, batch.get());
+  ASSERT_EQ(1, batch->num_rows());
+  ASSERT_EQ(96, streaming_reader->bytes_read());
+
+  auto schema = streaming_reader->schema();
+  ASSERT_EQ(3, schema->num_fields());
+  ASSERT_EQ("aaa", schema->field(0)->name());
+  ASSERT_EQ(Type::INT64, schema->field(0)->type()->id());
+  ASSERT_EQ("bbb", schema->field(1)->name());
+  ASSERT_EQ(Type::INT64, schema->field(1)->type()->id());
+  ASSERT_EQ("ccc", schema->field(2)->name());
+  ASSERT_EQ(Type::INT64, schema->field(2)->type()->id());

Review comment:
       You can construct an expected schema and call `AssertSchemaEqual`.

##########
File path: python/pyarrow/tests/test_csv.py
##########
@@ -609,28 +489,184 @@ def test_skip_rows_after_names(self):
             assert (values[opts.skip_rows + opts.skip_rows_after_names:] ==
                     table_dict[name])
 
-    def test_header_column_names(self):
+    def test_row_number_offset_in_errors(self, use_threads):
+        # Row numbers are only correctly counted in serial reads
+        def format_msg(msg_format, row, *args):
+            if use_threads:
+                row_info = ""
+            else:
+                row_info = "Row #{}: ".format(row)
+            return msg_format.format(row_info, *args)
+
+        csv, _ = make_random_csv(4, 100, write_names=True)
+
+        read_options = ReadOptions()
+        read_options.block_size = len(csv) / 3
+        convert_options = ConvertOptions()
+        convert_options.column_types = {"a": pa.int32()}
+
+        # Test without skip_rows and column names in the csv
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            102, csv)
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        long_row = (b"this is a long row" * 15) + b",3\r\n"
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test skipping rows after the names
+        read_options.skip_rows_after_names = 47
+
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        read_options.skip_rows_after_names = 0
+
+        # Test without skip_rows and column names not in the csv
+        csv, _ = make_random_csv(4, 100, write_names=False)
+        read_options.column_names = ["a", "b", "c", "d"]
+        csv_bad_columns = csv + b"1,2\r\n"
+        message_columns = format_msg("{}Expected 4 columns, got 2", 101)
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_columns_long = csv + long_row
+        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 101,
+                                  long_row[0:96].decode("utf-8"))
+        with pytest.raises(pa.ArrowInvalid, match=message_long):
+            self.read_bytes(csv_bad_columns_long, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        csv_bad_type = csv + b"a,b,c,d\r\n"
+        message_value = format_msg(
+            "In CSV column #0: {}"
+            "CSV conversion error to int32: invalid value 'a'",
+            101)
+        message_value = message_value.format(len(csv))
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        # Test with skip_rows and column names not in the csv
+        read_options.skip_rows = 23
+        with pytest.raises(pa.ArrowInvalid, match=message_columns):
+            self.read_bytes(csv_bad_columns, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+        with pytest.raises(pa.ArrowInvalid, match=message_value):
+            self.read_bytes(csv_bad_type, use_threads,
+                            read_options=read_options,
+                            convert_options=convert_options)
+
+
+@pytest.mark.parametrize('use_threads', [False, True])

Review comment:
       Not fond of this reorganization at all. Can't your test fit in the existing structure?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org