You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/22 14:06:26 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #14663: ARROW-17288: [C++] Adapt the CSV file format to the new scan API

lidavidm commented on code in PR #14663:
URL: https://github.com/apache/arrow/pull/14663#discussion_r1029343177


##########
cpp/src/arrow/compute/exec/expression.h:
##########
@@ -220,6 +220,8 @@ ARROW_EXPORT
 Result<Expression> SimplifyWithGuarantee(Expression,
                                          const Expression& guaranteed_true_predicate);
 
+ARROW_EXPORT Result<Expression> RemoveNamedRefs(Expression expression);

Review Comment:
   nit: add a docstring, unit tests?



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -175,6 +177,7 @@ class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
   /// data from the plan.  If this function is not called frequently enough then the sink
   /// node will start to accumulate data and may apply backpressure.
   std::function<Future<std::optional<ExecBatch>>()>* generator;
+  std::shared_ptr<Schema>* schema;

Review Comment:
   nit: docstring?



##########
cpp/src/arrow/dataset/scanner.h:
##########
@@ -178,6 +178,13 @@ struct ARROW_DS_EXPORT ScanV2Options : public compute::ExecNodeOptions {
   ///
   /// A single guarantee-aware filtering operation should generally be applied to all
   /// resulting batches.  The scan node is not responsible for this.
+  ///
+  /// Fields that are referenced by the filter should be included in the `columns` vector.
+  /// The scan node will not automatically fetch fields referenced by the filter
+  /// expression. \see AddFieldsNeededForFilter
+  ///
+  /// If the filter references fields that are not included in `columns` this may or may
+  /// not be an error, depending on the format.

Review Comment:
   Hmm, how would this not be an error? (I suppose if the format just ignores the filter?)



##########
cpp/src/arrow/dataset/file_base.h:
##########
@@ -108,6 +115,9 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable<FileSource> {
   /// \brief Get a RandomAccessFile which views this file source
   Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
 
+  /// \brief Get the size (in bytes) of the file or buffer

Review Comment:
   With compression, is this the compressed or uncompressed size?



##########
cpp/src/arrow/dataset/scanner.h:
##########
@@ -243,10 +250,17 @@ struct ARROW_DS_EXPORT ScanV2Options : public compute::ExecNodeOptions {
   /// one fragment at a time.
   int32_t fragment_readahead = kDefaultFragmentReadahead;
   /// \brief Options specific to the file format
-  FragmentScanOptions* format_options;
+  const FragmentScanOptions* format_options = NULLPTR;
 
   /// \brief Utility method to get a selection representing all columns in a dataset
-  static std::vector<FieldPath> AllColumns(const Dataset& dataset);
+  static std::vector<FieldPath> AllColumns(const Schema& dataset_schema);
+
+  /// \brief Utility method to add fields needed for the current filter
+  ///
+  /// This method adds any fields that are needed by `filter` which are not already
+  /// included in the list of columns.  Any new fields added will be added to the end
+  /// in no particular order.
+  static Status AddFieldsNeededForFilter(ScanV2Options* options);

Review Comment:
   Why isn't this just a regular method?



##########
cpp/src/arrow/dataset/scanner.cc:
##########
@@ -68,14 +68,28 @@ std::vector<FieldRef> ScanOptions::MaterializedFields() const {
   return fields;
 }
 
-std::vector<FieldPath> ScanV2Options::AllColumns(const Dataset& dataset) {
-  std::vector<FieldPath> selection(dataset.schema()->num_fields());
-  for (std::size_t i = 0; i < selection.size(); i++) {
-    selection[i] = {static_cast<int>(i)};
+std::vector<FieldPath> ScanV2Options::AllColumns(const Schema& dataset_schema) {
+  std::vector<FieldPath> selection(dataset_schema.num_fields());
+  for (int i = 0; i < dataset_schema.num_fields(); i++) {
+    selection[i] = {i};
   }
   return selection;
 }
 
+Status ScanV2Options::AddFieldsNeededForFilter(ScanV2Options* options) {
+  std::vector<FieldRef> fields_referenced = FieldsInExpression(options->filter);
+  for (const auto& field : fields_referenced) {
+    // Note: this will fail if the field reference is ambiguous or the field doesn't
+    // exist in the dataset schema
+    ARROW_ASSIGN_OR_RAISE(auto field_path, field.FindOne(*options->dataset->schema()));

Review Comment:
   IIRC FindOne is linear, so this operation overall is quadratic. Might be worth considering (as a follow up) how this pattern behaves on very wide datasets (1000-10000 columns) since we've gotten reports of issues at that size before.



##########
cpp/src/arrow/dataset/scanner.h:
##########
@@ -243,10 +250,17 @@ struct ARROW_DS_EXPORT ScanV2Options : public compute::ExecNodeOptions {
   /// one fragment at a time.
   int32_t fragment_readahead = kDefaultFragmentReadahead;
   /// \brief Options specific to the file format
-  FragmentScanOptions* format_options;
+  const FragmentScanOptions* format_options = NULLPTR;
 
   /// \brief Utility method to get a selection representing all columns in a dataset
-  static std::vector<FieldPath> AllColumns(const Dataset& dataset);
+  static std::vector<FieldPath> AllColumns(const Schema& dataset_schema);
+
+  /// \brief Utility method to add fields needed for the current filter
+  ///
+  /// This method adds any fields that are needed by `filter` which are not already
+  /// included in the list of columns.  Any new fields added will be added to the end
+  /// in no particular order.
+  static Status AddFieldsNeededForFilter(ScanV2Options* options);

Review Comment:
   Also, when would you ever _not_ want this? I suppose it's separate because the filter isn't provided upon construction - maybe we instead want a `Validate` method or something that can update state like this and check any other invariants?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org