You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2020/08/04 16:22:19 UTC
[arrow] branch master updated: ARROW-9609: [C++][Dataset]
CsvFileFormat reads all virtual columns as null
This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 6a889bc ARROW-9609: [C++][Dataset] CsvFileFormat reads all virtual columns as null
6a889bc is described below
commit 6a889bc6174ecc6a7bf32847774fdd928b7c619c
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Tue Aug 4 12:21:30 2020 -0400
ARROW-9609: [C++][Dataset] CsvFileFormat reads all virtual columns as null
`ConvertOptions::include_missing_columns = true` was insufficient to produce the required behavior with missing columns: we need to read the csv file's header to find the names of columns actually present in the file before instantiating a StreamingReader. Otherwise the StreamingReader will fill absent columns with `null`, which prevents the projector from materializing them correctly later.
Closes #7896 from bkietz/9609-csv-empty-virtual
Authored-by: Benjamin Kietzman <be...@gmail.com>
Signed-off-by: Benjamin Kietzman <be...@gmail.com>
---
cpp/src/arrow/compute/kernels/scalar_string.cc | 4 +-
cpp/src/arrow/dataset/file_csv.cc | 93 +++++++++++++++++++-------
cpp/src/arrow/dataset/file_csv_test.cc | 17 +++++
cpp/src/arrow/dataset/file_ipc_test.cc | 18 +++++
r/tests/testthat/test-dataset.R | 17 +++++
5 files changed, 123 insertions(+), 26 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/scalar_string.cc b/cpp/src/arrow/compute/kernels/scalar_string.cc
index 0d6b8da..a0ea240 100644
--- a/cpp/src/arrow/compute/kernels/scalar_string.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_string.cc
@@ -861,10 +861,10 @@ void AddBinaryLength(FunctionRegistry* registry) {
applicator::ScalarUnaryNotNull<Int32Type, StringType, BinaryLength>::Exec;
ArrayKernelExec exec_offset_64 =
applicator::ScalarUnaryNotNull<Int64Type, LargeStringType, BinaryLength>::Exec;
- for (const auto input_type : {binary(), utf8()}) {
+ for (const auto& input_type : {binary(), utf8()}) {
DCHECK_OK(func->AddKernel({input_type}, int32(), exec_offset_32));
}
- for (const auto input_type : {large_binary(), large_utf8()}) {
+ for (const auto& input_type : {large_binary(), large_utf8()}) {
DCHECK_OK(func->AddKernel({input_type}, int64(), exec_offset_64));
}
DCHECK_OK(registry->AddFunction(std::move(func)));
diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc
index f077ea2..3df9fa8 100644
--- a/cpp/src/arrow/dataset/file_csv.cc
+++ b/cpp/src/arrow/dataset/file_csv.cc
@@ -20,9 +20,11 @@
#include <algorithm>
#include <memory>
#include <string>
+#include <unordered_set>
#include <utility>
#include "arrow/csv/options.h"
+#include "arrow/csv/parser.h"
#include "arrow/csv/reader.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
@@ -39,49 +41,92 @@ namespace dataset {
using internal::checked_cast;
using internal::checked_pointer_cast;
+Result<std::unordered_set<std::string>> GetColumnNames(
+ const csv::ParseOptions& parse_options, util::string_view first_block,
+ MemoryPool* pool) {
+ uint32_t parsed_size = 0;
+ csv::BlockParser parser(pool, parse_options, /*num_cols=*/-1,
+ /*max_num_rows=*/1);
+
+ RETURN_NOT_OK(parser.Parse(util::string_view{first_block}, &parsed_size));
+
+ if (parser.num_rows() != 1) {
+ return Status::Invalid(
+ "Could not read first row from CSV file, either "
+ "file is truncated or header is larger than block size");
+ }
+
+ if (parser.num_cols() == 0) {
+ return Status::Invalid("No columns in CSV file");
+ }
+
+ std::unordered_set<std::string> column_names;
+
+ RETURN_NOT_OK(
+ parser.VisitLastRow([&](const uint8_t* data, uint32_t size, bool quoted) -> Status {
+ util::string_view view{reinterpret_cast<const char*>(data), size};
+ if (column_names.emplace(view.to_string()).second) {
+ return Status::OK();
+ }
+ return Status::Invalid("CSV file contained multiple columns named ", view);
+ }));
+
+ return column_names;
+}
+
static inline Result<csv::ConvertOptions> GetConvertOptions(
- const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
- auto options = csv::ConvertOptions::Defaults();
- if (scan_options != nullptr) {
- // This is set to true to match behavior with other formats; a missing column
- // will be materialized as null.
- options.include_missing_columns = true;
-
- for (const auto& field : scan_options->schema()->fields()) {
- options.column_types[field->name()] = field->type();
- options.include_columns.push_back(field->name());
- }
+ const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options,
+ const Buffer& first_block, MemoryPool* pool) {
+ ARROW_ASSIGN_OR_RAISE(
+ auto column_names,
+ GetColumnNames(format.parse_options, util::string_view{first_block}, pool));
+
+ auto convert_options = csv::ConvertOptions::Defaults();
+
+ for (const auto& field : scan_options->schema()->fields()) {
+ if (column_names.find(field->name()) == column_names.end()) continue;
+ convert_options.column_types[field->name()] = field->type();
+ convert_options.include_columns.push_back(field->name());
+ }
- // FIXME(bkietz) also acquire types of fields materialized but not projected.
- for (auto&& name : FieldsInExpression(scan_options->filter)) {
- ARROW_ASSIGN_OR_RAISE(auto match,
- FieldRef(name).FindOneOrNone(*scan_options->schema()));
- if (match.indices().empty()) {
- options.include_columns.push_back(std::move(name));
- }
+ // FIXME(bkietz) also acquire types of fields materialized but not projected.
+ for (auto&& name : FieldsInExpression(scan_options->filter)) {
+ ARROW_ASSIGN_OR_RAISE(auto match,
+ FieldRef(name).FindOneOrNone(*scan_options->schema()));
+ if (match.indices().empty()) {
+ convert_options.include_columns.push_back(std::move(name));
}
}
- return options;
+ return convert_options;
}
static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
- auto options = csv::ReadOptions::Defaults();
+ auto read_options = csv::ReadOptions::Defaults();
// Multithreaded conversion of individual files would lead to excessive thread
// contention when ScanTasks are also executed in multiple threads, so we disable it
// here.
- options.use_threads = false;
- return options;
+ read_options.use_threads = false;
+ return read_options;
}
static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
const FileSource& source, const CsvFileFormat& format,
- const std::shared_ptr<ScanOptions>& options = nullptr,
+ const std::shared_ptr<ScanOptions>& scan_options = nullptr,
MemoryPool* pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
auto reader_options = GetReadOptions(format);
+ ARROW_ASSIGN_OR_RAISE(auto first_block, input->ReadAt(0, reader_options.block_size));
+ RETURN_NOT_OK(input->Seek(0));
+
const auto& parse_options = format.parse_options;
- ARROW_ASSIGN_OR_RAISE(auto convert_options, GetConvertOptions(format, options));
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto convert_options,
+ scan_options == nullptr
+ ? ToResult(csv::ConvertOptions::Defaults())
+ : GetConvertOptions(format, scan_options, *first_block, pool));
+
auto maybe_reader = csv::StreamingReader::Make(pool, std::move(input), reader_options,
parse_options, convert_options);
if (!maybe_reader.ok()) {
diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc
index 9c54eb4..5c189f0 100644
--- a/cpp/src/arrow/dataset/file_csv_test.cc
+++ b/cpp/src/arrow/dataset/file_csv_test.cc
@@ -83,6 +83,23 @@ TEST_F(TestCsvFileFormat, ScanRecordBatchReader) {
ASSERT_EQ(row_count, 3);
}
+TEST_F(TestCsvFileFormat, ScanRecordBatchReaderWithVirtualColumn) {
+ auto source = GetFileSource();
+
+ opts_ = ScanOptions::Make(schema({schema_->field(0), field("virtual", int32())}));
+ ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+
+ int64_t row_count = 0;
+
+ for (auto maybe_batch : Batches(fragment.get())) {
+ ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch));
+ AssertSchemaEqual(*batch->schema(), *schema_);
+ row_count += batch->num_rows();
+ }
+
+ ASSERT_EQ(row_count, 3);
+}
+
TEST_F(TestCsvFileFormat, OpenFailureWithRelevantError) {
auto source = GetFileSource("");
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("<Buffer>"),
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc
index 747dccf..c557621 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -128,6 +128,24 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) {
ASSERT_EQ(row_count, kNumRows);
}
+TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) {
+ auto reader = GetRecordBatchReader();
+ auto source = GetFileSource(reader.get());
+
+ opts_ = ScanOptions::Make(schema({schema_->field(0), field("virtual", int32())}));
+ ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+
+ int64_t row_count = 0;
+
+ for (auto maybe_batch : Batches(fragment.get())) {
+ ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch));
+ AssertSchemaEqual(*batch->schema(), *schema_);
+ row_count += batch->num_rows();
+ }
+
+ ASSERT_EQ(row_count, kNumRows);
+}
+
TEST_F(TestIpcFileFormat, WriteRecordBatchReader) {
std::shared_ptr<RecordBatchReader> reader = GetRecordBatchReader();
auto source = GetFileSource(reader.get());
diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R
index f806866..e78dfd3 100644
--- a/r/tests/testthat/test-dataset.R
+++ b/r/tests/testthat/test-dataset.R
@@ -112,6 +112,12 @@ test_that("Simple interface for datasets", {
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
+
+ # Collecting virtual partition column works
+ expect_equal(
+ collect(ds) %>% pull(part),
+ c(rep(1, 10), rep(2, 10))
+ )
})
test_that("dim method returns the correct number of rows and columns",{
@@ -219,6 +225,12 @@ test_that("IPC/Feather format data", {
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
+
+ # Collecting virtual partition column works
+ expect_equal(
+ collect(ds) %>% pull(part),
+ c(rep(3, 10), rep(4, 10))
+ )
})
test_that("CSV dataset", {
@@ -239,6 +251,11 @@ test_that("CSV dataset", {
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
+ # Collecting virtual partition column works
+ expect_equal(
+ collect(ds) %>% pull(part),
+ c(rep(5, 10), rep(6, 10))
+ )
})
test_that("Other text delimited dataset", {