You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/04/24 19:11:54 UTC

[GitHub] [arrow] bkietz opened a new pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

bkietz opened a new pull request #7033:
URL: https://github.com/apache/arrow/pull/7033


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#issuecomment-621418575


   Reverted everything except parse_options. Follow up: https://issues.apache.org/jira/browse/ARROW-8631


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416239202



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       The edge case here is due to the way we pass only the projected schema during a scan, so maybe the solution is to always pass the full dataset schema as well so that no materialized fields will be inferring at scan time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r417959216



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/filter.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline Result<csv::ConvertOptions> GetConvertOptions(
+    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
+  auto options = csv::ConvertOptions::Defaults();
+  if (scan_options != nullptr) {
+    // This is set to true to match behavior with other formats; a missing column
+    // will be materialized as null.
+    options.include_missing_columns = true;

Review comment:
       I would expect the FilterAndProjectScanTask to fix this in a more efficient way.

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/filter.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline Result<csv::ConvertOptions> GetConvertOptions(
+    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
+  auto options = csv::ConvertOptions::Defaults();
+  if (scan_options != nullptr) {
+    // This is set to true to match behavior with other formats; a missing column
+    // will be materialized as null.
+    options.include_missing_columns = true;
+
+    for (const auto& field : scan_options->schema()->fields()) {
+      options.column_types[field->name()] = field->type();
+      options.include_columns.push_back(field->name());
+    }
+
+    // FIXME(bkietz) also acquire types of fields materialized but not projected.
+    for (auto&& name : FieldsInExpression(scan_options->filter)) {
+      ARROW_ASSIGN_OR_RAISE(auto match,
+                            FieldRef(name).FindOneOrNone(*scan_options->schema()));
+      if (match.indices().empty()) {
+        options.include_columns.push_back(std::move(name));
+      }
+    }
+  }
+  return options;
+}
+
+static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
+  auto options = csv::ReadOptions::Defaults();
+  // Multithreaded conversion of individual files would lead to excessive thread
+  // contention when ScanTasks are also executed in multiple threads, so we disable it
+  // here.
+  options.use_threads = false;
+  return options;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format,
+    const std::shared_ptr<ScanOptions>& options = nullptr,

Review comment:
       I'd take a ConvertOption instead of a ScanTask.

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/filter.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline Result<csv::ConvertOptions> GetConvertOptions(
+    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
+  auto options = csv::ConvertOptions::Defaults();
+  if (scan_options != nullptr) {
+    // This is set to true to match behavior with other formats; a missing column
+    // will be materialized as null.
+    options.include_missing_columns = true;
+
+    for (const auto& field : scan_options->schema()->fields()) {
+      options.column_types[field->name()] = field->type();
+      options.include_columns.push_back(field->name());
+    }
+
+    // FIXME(bkietz) also acquire types of fields materialized but not projected.
+    for (auto&& name : FieldsInExpression(scan_options->filter)) {
+      ARROW_ASSIGN_OR_RAISE(auto match,
+                            FieldRef(name).FindOneOrNone(*scan_options->schema()));
+      if (match.indices().empty()) {
+        options.include_columns.push_back(std::move(name));
+      }
+    }
+  }
+  return options;
+}
+
+static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
+  auto options = csv::ReadOptions::Defaults();
+  // Multithreaded conversion of individual files would lead to excessive thread
+  // contention when ScanTasks are also executed in multiple threads, so we disable it
+  // here.
+  options.use_threads = false;
+  return options;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format,
+    const std::shared_ptr<ScanOptions>& options = nullptr,
+    MemoryPool* pool = default_memory_pool()) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto reader_options = GetReadOptions(format);
+  const auto& parse_options = format.parse_options;
+  ARROW_ASSIGN_OR_RAISE(auto convert_options, GetConvertOptions(format, options));
+  auto maybe_reader = csv::StreamingReader::Make(pool, std::move(input), reader_options,
+                                                 parse_options, convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open CSV input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader,
+                          OpenReader(source_, *format_, options(), context()->pool));
+    return IteratorFromReader(std::move(reader));
+  }
+
+ private:
+  std::shared_ptr<const CsvFileFormat> format_;
+  FileSource source_;
+};
+
+Result<bool> CsvFileFormat::IsSupported(const FileSource& source) const {
+  RETURN_NOT_OK(source.Open().status());

Review comment:
       Can't you let OpenReader fail?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r415958451



##########
File path: cpp/src/arrow/dataset/file_csv.h
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+
+namespace arrow {
+namespace dataset {
+
+/// \brief A FileFormat implementation that reads from and writes to Csv files
+class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
+ public:
+  csv::ParseOptions parse_options = csv::ParseOptions::Defaults();

Review comment:
       Definitely you can set a full ParseOptions. I haven't exposed any of ReadOptions and ConvertOptions will probably need separate handling, as noted below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#issuecomment-619195315


   https://issues.apache.org/jira/browse/ARROW-7759


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416090623



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       PTAL. There's a subtlety resulting from the loosely typed nature of CSV: we don't know the types of columns materialized but not projected, which could lead to some very nasty errors in edge cases. For example, if filter were `"a"_ == "b"_` but only column `"c"` were projected then we have no information about the types of fields a, b except that they must be identical. I'm not sure what the correct solution here is




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r415887520



##########
File path: cpp/src/arrow/dataset/file_csv.h
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+
+namespace arrow {
+namespace dataset {
+
+/// \brief A FileFormat implementation that reads from and writes to Csv files
+class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
+ public:
+  csv::ParseOptions parse_options = csv::ParseOptions::Defaults();

Review comment:
       Can I set a full `csv::ParseOptions` object? We have builders for those in R already.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416110664



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       That's indeed a data error. Whether parquet or csv, in C++ this would be caught by the ScannerBuilder::Filter setter (which *does* have access to the dataset's schema), so maybe this edge case isn't interesting after all




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r415971988



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;

Review comment:
       will do




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#issuecomment-621229069


   @jorisvandenbossche 
   
   > do we plan to use the CsvFileFormat also for writing? ... For Parquet there is a ReaderOptions that grouped all options related to reading in the ParquetFileFormat
   
   We added ReaderOptions to avoid collisions with extant parquet write options. Since we don't have CSV write options avoiding collisions seems like specualtive generality; this can be handled when/if we add CSV writing. Also note that (for example) `ParseOptions::delimiter` would also be used when writing
   
   > I assume this is because the ParseOptions are used in its entirety, and for csv.ConvertOptions and csv.ReadOptions, only part of them is valid to set here? (another option could also be to check that those options are not set and if set raise an error to the user about that)
   
   That is indeed why I used only ParseOptions whole. It seemed better to me than documenting which fields are ignored or emitting errors when things aren't defaulted.
   
   > Do we want to allow reading csv files without a header line with column names?
   
   I can add `column_names` and `autogenerate*` too if required, but maybe that could wait for a follow up? Since the way I handled `/{Convert,Read}Options/` is evidently not transparent maybe everything except ParseOptions should wait as well?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#issuecomment-621263610


   I'll fix the R failures in the next couple of hours (docs need revision).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r415949596



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       Also, the `ScanOptions` should be used to set the `column_names`, `column_types` and `include_columns` options. Unless you prefer to let the CSV reader infer types and then convert them yourself?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r418053225



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/filter.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline Result<csv::ConvertOptions> GetConvertOptions(
+    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
+  auto options = csv::ConvertOptions::Defaults();
+  options.null_values = format.null_values;
+  options.true_values = format.true_values;
+  options.false_values = format.false_values;
+  options.strings_can_be_null = format.strings_can_be_null;
+  options.auto_dict_encode = format.auto_dict_encode;
+  options.auto_dict_max_cardinality = format.auto_dict_max_cardinality;
+  if (scan_options != nullptr) {
+    // This is set to true to match behavior with other formats; a missing column
+    // will be materialized as null.
+    options.include_missing_columns = true;
+
+    for (const auto& field : scan_options->schema()->fields()) {
+      options.column_types[field->name()] = field->type();
+      options.include_columns.push_back(field->name());
+    }
+
+    // FIXME(bkietz) also acquire types of fields materialized but not projected.
+    for (auto&& name : FieldsInExpression(scan_options->filter)) {
+      ARROW_ASSIGN_OR_RAISE(auto match,
+                            FieldRef(name).FindOneOrNone(*scan_options->schema()));
+      if (match.indices().empty()) {
+        options.include_columns.push_back(std::move(name));
+      }
+    }
+  }
+  return options;
+}
+
+static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
+  auto options = csv::ReadOptions::Defaults();
+  // Multithreaded conversion of individual files would lead to excessive thread
+  // contention when ScanTasks are also executed in multiple threads, so we disable it
+  // here.

Review comment:
       Parquet as a format does not require chunking; it is already subdivided into rowgroups. We simply wrap each of those in scan tasks and in this way scan individual files in parallel.
   
   There are several ways we could augment CSV to support something like this: exposing a task interface as I mentioned, including a sidecar file with cached chunk start offsets, using `csv::Chunker` directly when generating `ScanTask`s (so each `ScanTask` wraps a single independently parseable  chunk), ...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r414852277



##########
File path: cpp/src/arrow/dataset/file_csv.h
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+
+namespace arrow {
+namespace dataset {
+
+/// \brief A FileFormat implementation that reads from and writes to Csv files
+class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
+ public:
+  csv::ParseOptions parse_options = csv::ParseOptions::Defaults();

Review comment:
       How do I provide these options? `auto fmt = std::make_shared<ds::CsvFileFormat>();` and then like `fmt.parse_options = something`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#issuecomment-619213038


   @github-actions rebase


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r415958451



##########
File path: cpp/src/arrow/dataset/file_csv.h
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+
+namespace arrow {
+namespace dataset {
+
+/// \brief A FileFormat implementation that reads from and writes to Csv files
+class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
+ public:
+  csv::ParseOptions parse_options = csv::ParseOptions::Defaults();

Review comment:
       Definitely you can set a full ParseOptions and ConvertOptions. I haven't exposed any of ReadOptions, as noted below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r418109100



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/filter.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline Result<csv::ConvertOptions> GetConvertOptions(
+    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
+  auto options = csv::ConvertOptions::Defaults();
+  if (scan_options != nullptr) {
+    // This is set to true to match behavior with other formats; a missing column
+    // will be materialized as null.
+    options.include_missing_columns = true;
+
+    for (const auto& field : scan_options->schema()->fields()) {
+      options.column_types[field->name()] = field->type();
+      options.include_columns.push_back(field->name());
+    }
+
+    // FIXME(bkietz) also acquire types of fields materialized but not projected.
+    for (auto&& name : FieldsInExpression(scan_options->filter)) {
+      ARROW_ASSIGN_OR_RAISE(auto match,
+                            FieldRef(name).FindOneOrNone(*scan_options->schema()));
+      if (match.indices().empty()) {
+        options.include_columns.push_back(std::move(name));
+      }
+    }
+  }
+  return options;
+}
+
+static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
+  auto options = csv::ReadOptions::Defaults();
+  // Multithreaded conversion of individual files would lead to excessive thread
+  // contention when ScanTasks are also executed in multiple threads, so we disable it
+  // here.
+  options.use_threads = false;
+  return options;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format,
+    const std::shared_ptr<ScanOptions>& options = nullptr,
+    MemoryPool* pool = default_memory_pool()) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto reader_options = GetReadOptions(format);
+  const auto& parse_options = format.parse_options;
+  ARROW_ASSIGN_OR_RAISE(auto convert_options, GetConvertOptions(format, options));
+  auto maybe_reader = csv::StreamingReader::Make(pool, std::move(input), reader_options,
+                                                 parse_options, convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open CSV input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader,
+                          OpenReader(source_, *format_, options(), context()->pool));
+    return IteratorFromReader(std::move(reader));
+  }
+
+ private:
+  std::shared_ptr<const CsvFileFormat> format_;
+  FileSource source_;
+};
+
+Result<bool> CsvFileFormat::IsSupported(const FileSource& source) const {
+  RETURN_NOT_OK(source.Open().status());

Review comment:
       If source fails to open (for example if it points to a file which doesn't exist) that should raise an error rather than returning false, which is what this line detects.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#issuecomment-621814564


   I'm happy to implement whatever configuration is agreeable. I'll add a list of the approaches which have been discussed here to the follow-up so we can discuss them there.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416253903



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       I'll add a failing test tomorrow so this can become less hand wavy




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416235260



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       Reading using a predetermined schema is already supported for CSV and for all the other formats. The first chunk (as determined by `CsvFileFormat::chunk_size`) is used to sniff the schema which is assumed for the rest of the file, and this inspection can be applied to just one or to all the files in a dataset at discovery time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416093398



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       That sounds nasty indeed. The datasets layer doesn't know the type of these columns at all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r418108005



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/filter.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline Result<csv::ConvertOptions> GetConvertOptions(
+    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
+  auto options = csv::ConvertOptions::Defaults();
+  if (scan_options != nullptr) {
+    // This is set to true to match behavior with other formats; a missing column
+    // will be materialized as null.
+    options.include_missing_columns = true;
+
+    for (const auto& field : scan_options->schema()->fields()) {
+      options.column_types[field->name()] = field->type();
+      options.include_columns.push_back(field->name());
+    }
+
+    // FIXME(bkietz) also acquire types of fields materialized but not projected.
+    for (auto&& name : FieldsInExpression(scan_options->filter)) {
+      ARROW_ASSIGN_OR_RAISE(auto match,
+                            FieldRef(name).FindOneOrNone(*scan_options->schema()));
+      if (match.indices().empty()) {
+        options.include_columns.push_back(std::move(name));
+      }
+    }
+  }
+  return options;
+}
+
+static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
+  auto options = csv::ReadOptions::Defaults();
+  // Multithreaded conversion of individual files would lead to excessive thread
+  // contention when ScanTasks are also executed in multiple threads, so we disable it
+  // here.
+  options.use_threads = false;
+  return options;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format,
+    const std::shared_ptr<ScanOptions>& options = nullptr,

Review comment:
       Not sure what you mean




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r415971888



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       Assembling these from `ScanOptions` is definitely a goal; the first pass just defers all this to the format. Some of `ConvertOptions` belong in the format (`null_values`, ..., `strings_can_be_null`, ...) while others such as the ones you note do not. I'll rewrite shortly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#issuecomment-620831482


   @github-actions rebase


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#issuecomment-621795314


   Fine to leave the other options as a follow-up. We will want to add them at some point of course. Another option could also be to expose all keyword as direct options (also the ParseOptions keywords), to avoid the discrepancy between ParseOptions as struct vs single options for the others.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416941674



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       https://github.com/apache/arrow/pull/7033/commits/37c493695cb5603b7836291985e6355c3cf1de82




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r417559116



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       Follow up JIRA: https://issues.apache.org/jira/browse/ARROW-8630




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416097760



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       The type of field "a" can't be known without the schema from which it originates. When the filter is applied to a dataset in python or R the dataset's schema will be used to add cast expressions to the filter which will handle the necessary conversions. However in c++ we don't pass the dataset schema at scan time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r415981809



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),

Review comment:
       - not currently implemented, will add




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r414856596



##########
File path: cpp/src/arrow/dataset/file_csv.h
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+
+namespace arrow {
+namespace dataset {
+
+/// \brief A FileFormat implementation that reads from and writes to Csv files
+class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
+ public:
+  csv::ParseOptions parse_options = csv::ParseOptions::Defaults();

Review comment:
       fmt->parse_options.delimiter = '\t'
   
   For example




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r417324952



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/filter.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline Result<csv::ConvertOptions> GetConvertOptions(
+    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
+  auto options = csv::ConvertOptions::Defaults();
+  options.null_values = format.null_values;
+  options.true_values = format.true_values;
+  options.false_values = format.false_values;
+  options.strings_can_be_null = format.strings_can_be_null;
+  options.auto_dict_encode = format.auto_dict_encode;
+  options.auto_dict_max_cardinality = format.auto_dict_max_cardinality;
+  if (scan_options != nullptr) {
+    // This is set to true to match behavior with other formats; a missing column
+    // will be materialized as null.
+    options.include_missing_columns = true;
+
+    for (const auto& field : scan_options->schema()->fields()) {
+      options.column_types[field->name()] = field->type();
+      options.include_columns.push_back(field->name());
+    }
+
+    // FIXME(bkietz) also acquire types of fields materialized but not projected.
+    for (auto&& name : FieldsInExpression(scan_options->filter)) {
+      ARROW_ASSIGN_OR_RAISE(auto match,
+                            FieldRef(name).FindOneOrNone(*scan_options->schema()));
+      if (match.indices().empty()) {
+        options.include_columns.push_back(std::move(name));
+      }
+    }
+  }
+  return options;
+}
+
+static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
+  auto options = csv::ReadOptions::Defaults();
+  // Multithreaded conversion of individual files would lead to excessive thread
+  // contention when ScanTasks are also executed in multiple threads, so we disable it
+  // here.

Review comment:
       Multithreaded reading of a single CSV file without excessive contention in the many file case would require refactoring the CSV readers to expose their parallelism as tasks which could be productively scheduled alongside the `ScanTask`s. That's a significant change and should certainly wait for a follow up.
   
   Even after we have such an API, the decision to use threading or not would be derived from scan parameters (see `ScannerBuilder::UseThreads`) and so it doesn't belong in the format.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] wesm commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
wesm commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416152095



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       In database systems, it is required to use a pre-determined schema when reading CSV files. If we want to support schema inference, we could use a part of a file to "sniff" the schema but then assume it going forward. That's a bit better than forcing people to write down the schema




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r415798798



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",

Review comment:
       ```suggestion
       return maybe_reader.status().WithMessage("Could not open CSV input source '",
   ```

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;

Review comment:
       This warrants a comment explaining why we disable threading.

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       It's missing the column selection optimisation. While it won't save the IO, it'll save the memory and cpu time from elided  conversion.

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),

Review comment:
       - How would you pass the Scan's memory pool?
   - We'll need a "merge" ReadOptions like we do for parquet. See other comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r418111903



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/filter.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline Result<csv::ConvertOptions> GetConvertOptions(
+    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
+  auto options = csv::ConvertOptions::Defaults();
+  if (scan_options != nullptr) {
+    // This is set to true to match behavior with other formats; a missing column
+    // will be materialized as null.
+    options.include_missing_columns = true;

Review comment:
       This is just to stop conversion from erroring if a column is projected but absent from the file. Another way to handle this would be: restrict include_colums to columns present in the file then let the projector handle the rest as you say, but that would require knowledge of the file's columns which we don't have at this stage




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416099536



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       So if e.g. two Parquet files in a dataset have different types for `a` or `b` (which may be a data error), that passes silently?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r417955841



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/filter.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline Result<csv::ConvertOptions> GetConvertOptions(
+    const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) {
+  auto options = csv::ConvertOptions::Defaults();
+  options.null_values = format.null_values;
+  options.true_values = format.true_values;
+  options.false_values = format.false_values;
+  options.strings_can_be_null = format.strings_can_be_null;
+  options.auto_dict_encode = format.auto_dict_encode;
+  options.auto_dict_max_cardinality = format.auto_dict_max_cardinality;
+  if (scan_options != nullptr) {
+    // This is set to true to match behavior with other formats; a missing column
+    // will be materialized as null.
+    options.include_missing_columns = true;
+
+    for (const auto& field : scan_options->schema()->fields()) {
+      options.column_types[field->name()] = field->type();
+      options.include_columns.push_back(field->name());
+    }
+
+    // FIXME(bkietz) also acquire types of fields materialized but not projected.
+    for (auto&& name : FieldsInExpression(scan_options->filter)) {
+      ARROW_ASSIGN_OR_RAISE(auto match,
+                            FieldRef(name).FindOneOrNone(*scan_options->schema()));
+      if (match.indices().empty()) {
+        options.include_columns.push_back(std::move(name));
+      }
+    }
+  }
+  return options;
+}
+
+static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
+  auto options = csv::ReadOptions::Defaults();
+  // Multithreaded conversion of individual files would lead to excessive thread
+  // contention when ScanTasks are also executed in multiple threads, so we disable it
+  // here.

Review comment:
       For my education: the parquet reader has such APIs, and thus there this is not a problem?
   
   > use threading or not would be derived from scan parameters (see ScannerBuilder::UseThreads) and so it doesn't belong in the format.
   
   Yep, that's true




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] wesm commented on a change in pull request #7033: ARROW-7759: [C++][Dataset] Add CsvFileFormat

Posted by GitBox <gi...@apache.org>.
wesm commented on a change in pull request #7033:
URL: https://github.com/apache/arrow/pull/7033#discussion_r416152095



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/file_csv.h"
+
+#include <memory>
+#include <string>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/dataset/dataset_internal.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/dataset/visibility.h"
+#include "arrow/result.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+namespace dataset {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+static inline csv::ReadOptions default_read_options() {
+  auto defaults = csv::ReadOptions::Defaults();
+  defaults.use_threads = false;
+  return defaults;
+}
+
+static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+    const FileSource& source, const CsvFileFormat& format) {
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+
+  auto maybe_reader = csv::StreamingReader::Make(
+      default_memory_pool(), std::move(input), default_read_options(),
+      format.parse_options, format.convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open IPC input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+
+  return std::move(maybe_reader).ValueOrDie();
+}
+
+/// \brief A ScanTask backed by an Csv file.
+class CsvScanTask : public ScanTask {
+ public:
+  CsvScanTask(std::shared_ptr<const CsvFileFormat> format, FileSource source,
+              std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
+      : ScanTask(std::move(options), std::move(context)),
+        format_(std::move(format)),
+        source_(std::move(source)) {}
+
+  Result<RecordBatchIterator> Execute() override {
+    ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_));

Review comment:
       In most database systems, it is required to use a pre-determined schema when reading CSV files. If we want to support schema inference, we could use a part of a file to "sniff" the schema but then assume it going forward. That's a bit better than forcing people to write down the schema




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org