You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2020/08/04 16:22:19 UTC

[arrow] branch master updated: ARROW-9609: [C++][Dataset] CsvFileFormat reads all virtual columns as null

This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a889bc  ARROW-9609: [C++][Dataset] CsvFileFormat reads all virtual columns as null
6a889bc is described below

commit 6a889bc6174ecc6a7bf32847774fdd928b7c619c
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Tue Aug 4 12:21:30 2020 -0400

    ARROW-9609: [C++][Dataset] CsvFileFormat reads all virtual columns as null
    
    `ConvertOptions::include_missing_columns = true` was insufficient to produce the required behavior with missing columns: we need to read the csv file's header to find the names of columns actually present in the file before instantiating a StreamingReader. Otherwise the StreamingReader will fill absent columns with `null`, which prevents the projector from materializing them correctly later.
    
    Closes #7896 from bkietz/9609-csv-empty-virtual
    
    Authored-by: Benjamin Kietzman <be...@gmail.com>
    Signed-off-by: Benjamin Kietzman <be...@gmail.com>
---
 cpp/src/arrow/compute/kernels/scalar_string.cc |  4 +-
 cpp/src/arrow/dataset/file_csv.cc              | 93 +++++++++++++++++++-------
 cpp/src/arrow/dataset/file_csv_test.cc         | 17 +++++
 cpp/src/arrow/dataset/file_ipc_test.cc         | 18 +++++
 r/tests/testthat/test-dataset.R                | 17 +++++
 5 files changed, 123 insertions(+), 26 deletions(-)

diff --git a/cpp/src/arrow/compute/kernels/scalar_string.cc b/cpp/src/arrow/compute/kernels/scalar_string.cc
index 0d6b8da..a0ea240 100644
--- a/cpp/src/arrow/compute/kernels/scalar_string.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_string.cc
@@ -861,10 +861,10 @@ void AddBinaryLength(FunctionRegistry* registry) {
       applicator::ScalarUnaryNotNull<Int32Type, StringType, BinaryLength>::Exec;
   ArrayKernelExec exec_offset_64 =
       applicator::ScalarUnaryNotNull<Int64Type, LargeStringType, BinaryLength>::Exec;
-  for (const auto input_type : {binary(), utf8()}) {
+  for (const auto& input_type : {binary(), utf8()}) {
     DCHECK_OK(func->AddKernel({input_type}, int32(), exec_offset_32));
   }
-  for (const auto input_type : {large_binary(), large_utf8()}) {
+  for (const auto& input_type : {large_binary(), large_utf8()}) {
     DCHECK_OK(func->AddKernel({input_type}, int64(), exec_offset_64));
   }
   DCHECK_OK(registry->AddFunction(std::move(func)));
diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc
index f077ea2..3df9fa8 100644
--- a/cpp/src/arrow/dataset/file_csv.cc
+++ b/cpp/src/arrow/dataset/file_csv.cc
@@ -20,9 +20,11 @@
 #include <algorithm>
 #include <memory>
 #include <string>
+#include <unordered_set>
 #include <utility>
 
 #include "arrow/csv/options.h"
+#include "arrow/csv/parser.h"
 #include "arrow/csv/reader.h"
 #include "arrow/dataset/dataset_internal.h"
 #include "arrow/dataset/file_base.h"
@@ -39,49 +41,92 @@ namespace dataset {
 using internal::checked_cast;
 using internal::checked_pointer_cast;
 
+Result<std::unordered_set<std::string>> GetColumnNames(
+    const csv::ParseOptions& parse_options, util::string_view first_block,
+    MemoryPool* pool) {
+  uint32_t parsed_size = 0;
+  csv::BlockParser parser(pool, parse_options, /*num_cols=*/-1,
+                          /*max_num_rows=*/1);
+
+  RETURN_NOT_OK(parser.Parse(util::string_view{first_block}, &parsed_size));
+
+  if (parser.num_rows() != 1) {
+    return Status::Invalid(
+        "Could not read first row from CSV file, either "
+        "file is truncated or header is larger than block size");
+  }
+
+  if (parser.num_cols() == 0) {
+    return Status::Invalid("No columns in CSV file");
+  }
+
+  std::unordered_set<std::string> column_names;
+
+  RETURN_NOT_OK(
+      parser.VisitLastRow([&](const uint8_t* data, uint32_t size, bool quoted) -> Status {
+        util::string_view view{reinterpret_cast<const char*>(data), size};
+        if (column_names.emplace(view.to_string()).second) {
+          return Status::OK();
+        }
+        return Status::Invalid("CSV file contained multiple columns named ", view);
+      }));
+
+  return column_names;
+}
+
 static inline Result<csv::ConvertOptions> GetConvertOptions(
-    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
-  auto options = csv::ConvertOptions::Defaults();
-  if (scan_options != nullptr) {
-    // This is set to true to match behavior with other formats; a missing column
-    // will be materialized as null.
-    options.include_missing_columns = true;
-
-    for (const auto& field : scan_options->schema()->fields()) {
-      options.column_types[field->name()] = field->type();
-      options.include_columns.push_back(field->name());
-    }
+    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options,
+    const Buffer& first_block, MemoryPool* pool) {
+  ARROW_ASSIGN_OR_RAISE(
+      auto column_names,
+      GetColumnNames(format.parse_options, util::string_view{first_block}, pool));
+
+  auto convert_options = csv::ConvertOptions::Defaults();
+
+  for (const auto& field : scan_options->schema()->fields()) {
+    if (column_names.find(field->name()) == column_names.end()) continue;
+    convert_options.column_types[field->name()] = field->type();
+    convert_options.include_columns.push_back(field->name());
+  }
 
-    // FIXME(bkietz) also acquire types of fields materialized but not projected.
-    for (auto&& name : FieldsInExpression(scan_options->filter)) {
-      ARROW_ASSIGN_OR_RAISE(auto match,
-                            FieldRef(name).FindOneOrNone(*scan_options->schema()));
-      if (match.indices().empty()) {
-        options.include_columns.push_back(std::move(name));
-      }
+  // FIXME(bkietz) also acquire types of fields materialized but not projected.
+  for (auto&& name : FieldsInExpression(scan_options->filter)) {
+    ARROW_ASSIGN_OR_RAISE(auto match,
+                          FieldRef(name).FindOneOrNone(*scan_options->schema()));
+    if (match.indices().empty()) {
+      convert_options.include_columns.push_back(std::move(name));
     }
   }
-  return options;
+  return convert_options;
 }
 
 static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
-  auto options = csv::ReadOptions::Defaults();
+  auto read_options = csv::ReadOptions::Defaults();
   // Multithreaded conversion of individual files would lead to excessive thread
   // contention when ScanTasks are also executed in multiple threads, so we disable it
   // here.
-  options.use_threads = false;
-  return options;
+  read_options.use_threads = false;
+  return read_options;
 }
 
 static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
     const FileSource& source, const CsvFileFormat& format,
-    const std::shared_ptr<ScanOptions>& options = nullptr,
+    const std::shared_ptr<ScanOptions>& scan_options = nullptr,
     MemoryPool* pool = default_memory_pool()) {
   ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
 
   auto reader_options = GetReadOptions(format);
+  ARROW_ASSIGN_OR_RAISE(auto first_block, input->ReadAt(0, reader_options.block_size));
+  RETURN_NOT_OK(input->Seek(0));
+
   const auto& parse_options = format.parse_options;
-  ARROW_ASSIGN_OR_RAISE(auto convert_options, GetConvertOptions(format, options));
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto convert_options,
+      scan_options == nullptr
+          ? ToResult(csv::ConvertOptions::Defaults())
+          : GetConvertOptions(format, scan_options, *first_block, pool));
+
   auto maybe_reader = csv::StreamingReader::Make(pool, std::move(input), reader_options,
                                                  parse_options, convert_options);
   if (!maybe_reader.ok()) {
diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc
index 9c54eb4..5c189f0 100644
--- a/cpp/src/arrow/dataset/file_csv_test.cc
+++ b/cpp/src/arrow/dataset/file_csv_test.cc
@@ -83,6 +83,23 @@ TEST_F(TestCsvFileFormat, ScanRecordBatchReader) {
   ASSERT_EQ(row_count, 3);
 }
 
+TEST_F(TestCsvFileFormat, ScanRecordBatchReaderWithVirtualColumn) {
+  auto source = GetFileSource();
+
+  opts_ = ScanOptions::Make(schema({schema_->field(0), field("virtual", int32())}));
+  ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+
+  int64_t row_count = 0;
+
+  for (auto maybe_batch : Batches(fragment.get())) {
+    ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch));
+    AssertSchemaEqual(*batch->schema(), *schema_);
+    row_count += batch->num_rows();
+  }
+
+  ASSERT_EQ(row_count, 3);
+}
+
 TEST_F(TestCsvFileFormat, OpenFailureWithRelevantError) {
   auto source = GetFileSource("");
   EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("<Buffer>"),
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc
index 747dccf..c557621 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -128,6 +128,24 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) {
   ASSERT_EQ(row_count, kNumRows);
 }
 
+TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) {
+  auto reader = GetRecordBatchReader();
+  auto source = GetFileSource(reader.get());
+
+  opts_ = ScanOptions::Make(schema({schema_->field(0), field("virtual", int32())}));
+  ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+
+  int64_t row_count = 0;
+
+  for (auto maybe_batch : Batches(fragment.get())) {
+    ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch));
+    AssertSchemaEqual(*batch->schema(), *schema_);
+    row_count += batch->num_rows();
+  }
+
+  ASSERT_EQ(row_count, kNumRows);
+}
+
 TEST_F(TestIpcFileFormat, WriteRecordBatchReader) {
   std::shared_ptr<RecordBatchReader> reader = GetRecordBatchReader();
   auto source = GetFileSource(reader.get());
diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R
index f806866..e78dfd3 100644
--- a/r/tests/testthat/test-dataset.R
+++ b/r/tests/testthat/test-dataset.R
@@ -112,6 +112,12 @@ test_that("Simple interface for datasets", {
       filter(integer > 6) %>%
       summarize(mean = mean(integer))
   )
+
+  # Collecting virtual partition column works
+  expect_equal(
+    collect(ds) %>% pull(part),
+    c(rep(1, 10), rep(2, 10))
+  )
 })
 
 test_that("dim method returns the correct number of rows and columns",{
@@ -219,6 +225,12 @@ test_that("IPC/Feather format data", {
       filter(integer > 6) %>%
       summarize(mean = mean(integer))
   )
+
+  # Collecting virtual partition column works
+  expect_equal(
+    collect(ds) %>% pull(part),
+    c(rep(3, 10), rep(4, 10))
+  )
 })
 
 test_that("CSV dataset", {
@@ -239,6 +251,11 @@ test_that("CSV dataset", {
       filter(integer > 6) %>%
       summarize(mean = mean(integer))
   )
+  # Collecting virtual partition column works
+  expect_equal(
+    collect(ds) %>% pull(part),
+    c(rep(5, 10), rep(6, 10))
+  )
 })
 
 test_that("Other text delimited dataset", {