You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fs...@apache.org on 2020/06/26 19:51:15 UTC

[arrow] branch master updated: ARROW-9146: [C++][Dataset] Lazily store fragment physical schema

This is an automated email from the ASF dual-hosted git repository.

fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 975f4eb  ARROW-9146: [C++][Dataset] Lazily store fragment physical schema
975f4eb is described below

commit 975f4eb8e6dd0e6062e92958adf1a79b838ddba8
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Fri Jun 26 15:50:38 2020 -0400

    ARROW-9146: [C++][Dataset] Lazily store fragment physical schema
    
    The physical schema is required to validate predicates used for filtering row groups based on statistics.
    
    It can also be explicitly provided to ensure that if no row groups satisfy the predicate no I/O is done.
    
    Closes #7526 from bkietz/9146-Scanning-a-Fragment-with-
    
    Lead-authored-by: Benjamin Kietzman <be...@gmail.com>
    Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
    Signed-off-by: François Saint-Jacques <fs...@gmail.com>
---
 cpp/src/arrow/CMakeLists.txt                 |  1 +
 cpp/src/arrow/dataset/dataset.cc             | 25 ++++++--
 cpp/src/arrow/dataset/dataset.h              | 16 +++--
 cpp/src/arrow/dataset/dataset_test.cc        |  2 +-
 cpp/src/arrow/dataset/file_base.cc           | 20 +++++--
 cpp/src/arrow/dataset/file_base.h            | 25 +++++---
 cpp/src/arrow/dataset/file_csv.cc            | 10 ++--
 cpp/src/arrow/dataset/file_csv.h             |  6 +-
 cpp/src/arrow/dataset/file_ipc.cc            |  9 +--
 cpp/src/arrow/dataset/file_ipc.h             |  6 +-
 cpp/src/arrow/dataset/file_parquet.cc        | 71 ++++++++++++----------
 cpp/src/arrow/dataset/file_parquet.h         | 27 ++++-----
 cpp/src/arrow/dataset/file_parquet_test.cc   |  5 +-
 cpp/src/arrow/dataset/filter.cc              |  4 ++
 cpp/src/arrow/dataset/test_util.h            | 16 ++---
 cpp/src/arrow/util/mutex.cc                  | 47 +++++++++++++++
 cpp/src/arrow/util/mutex.h                   | 60 +++++++++++++++++++
 python/pyarrow/_dataset.pyx                  | 12 ++--
 python/pyarrow/includes/libarrow_dataset.pxd |  9 ++-
 python/pyarrow/tests/test_dataset.py         | 90 ++++++++++++++++++++++++++++
 20 files changed, 354 insertions(+), 107 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index fe5ad8f..19ebe0e 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -193,6 +193,7 @@ set(ARROW_SRCS
     util/logging.cc
     util/key_value_metadata.cc
     util/memory.cc
+    util/mutex.cc
     util/string.cc
     util/string_builder.cc
     util/task_group.cc
diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc
index 3173c31..ed936db 100644
--- a/cpp/src/arrow/dataset/dataset.cc
+++ b/cpp/src/arrow/dataset/dataset.cc
@@ -32,19 +32,32 @@
 namespace arrow {
 namespace dataset {
 
-Fragment::Fragment(std::shared_ptr<Expression> partition_expression)
-    : partition_expression_(std::move(partition_expression)) {
+Fragment::Fragment(std::shared_ptr<Expression> partition_expression,
+                   std::shared_ptr<Schema> physical_schema)
+    : partition_expression_(std::move(partition_expression)),
+      physical_schema_(std::move(physical_schema)) {
   DCHECK_NE(partition_expression_, nullptr);
 }
 
-Result<std::shared_ptr<Schema>> InMemoryFragment::ReadPhysicalSchema() { return schema_; }
+Result<std::shared_ptr<Schema>> Fragment::ReadPhysicalSchema() {
+  auto lock = physical_schema_mutex_.Lock();
+  if (physical_schema_ == NULLPTR) {
+    ARROW_ASSIGN_OR_RAISE(physical_schema_, ReadPhysicalSchemaImpl());
+  }
+  return physical_schema_;
+}
+
+Result<std::shared_ptr<Schema>> InMemoryFragment::ReadPhysicalSchemaImpl() {
+  return physical_schema_;
+}
 
 InMemoryFragment::InMemoryFragment(std::shared_ptr<Schema> schema,
                                    RecordBatchVector record_batches,
                                    std::shared_ptr<Expression> partition_expression)
-    : Fragment(std::move(partition_expression)),
-      schema_(std::move(schema)),
-      record_batches_(std::move(record_batches)) {}
+    : Fragment(std::move(partition_expression), std::move(schema)),
+      record_batches_(std::move(record_batches)) {
+  DCHECK_NE(physical_schema_, nullptr);
+}
 
 InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches,
                                    std::shared_ptr<Expression> partition_expression)
diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h
index bd77192..be138dd 100644
--- a/cpp/src/arrow/dataset/dataset.h
+++ b/cpp/src/arrow/dataset/dataset.h
@@ -28,6 +28,7 @@
 #include "arrow/dataset/type_fwd.h"
 #include "arrow/dataset/visibility.h"
 #include "arrow/util/macros.h"
+#include "arrow/util/mutex.h"
 
 namespace arrow {
 namespace dataset {
@@ -47,7 +48,8 @@ class ARROW_DS_EXPORT Fragment {
   ///
   /// The physical schema is also called the writer schema.
   /// This method is blocking and may suffer from high latency filesystem.
-  virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchema() = 0;
+  /// The schema is cached after being read once, or may be specified at construction.
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchema();
 
   /// \brief Scan returns an iterator of ScanTasks, each of which yields
   /// RecordBatches from this Fragment.
@@ -77,9 +79,14 @@ class ARROW_DS_EXPORT Fragment {
 
  protected:
   Fragment() = default;
-  explicit Fragment(std::shared_ptr<Expression> partition_expression);
+  explicit Fragment(std::shared_ptr<Expression> partition_expression,
+                    std::shared_ptr<Schema> physical_schema);
 
+  virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() = 0;
+
+  util::Mutex physical_schema_mutex_;
   std::shared_ptr<Expression> partition_expression_ = scalar(true);
+  std::shared_ptr<Schema> physical_schema_;
 };
 
 /// \brief A trivial Fragment that yields ScanTask out of a fixed set of
@@ -91,8 +98,6 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
   explicit InMemoryFragment(RecordBatchVector record_batches,
                             std::shared_ptr<Expression> = scalar(true));
 
-  Result<std::shared_ptr<Schema>> ReadPhysicalSchema() override;
-
   Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
                                 std::shared_ptr<ScanContext> context) override;
 
@@ -101,7 +106,8 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
   std::string type_name() const override { return "in-memory"; }
 
  protected:
-  std::shared_ptr<Schema> schema_;
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
+
   RecordBatchVector record_batches_;
 };
 
diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc
index 4be6e33..67145a9 100644
--- a/cpp/src/arrow/dataset/dataset_test.cc
+++ b/cpp/src/arrow/dataset/dataset_test.cc
@@ -44,7 +44,7 @@ TEST_F(TestInMemoryFragment, Scan) {
   auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);
 
   // Creates a InMemoryFragment of the same repeated batch.
-  auto fragment = InMemoryFragment({static_cast<size_t>(kNumberBatches), batch});
+  InMemoryFragment fragment({static_cast<size_t>(kNumberBatches), batch});
 
   AssertFragmentEquals(reader.get(), &fragment);
 }
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index a39289d..7f2d8f4 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -54,14 +54,22 @@ Result<std::shared_ptr<arrow::io::OutputStream>> WritableFileSource::Open() cons
   return std::make_shared<::arrow::io::BufferOutputStream>(buffer_);
 }
 
-Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(FileSource source) {
-  return MakeFragment(std::move(source), scalar(true));
+Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Schema> physical_schema) {
+  return MakeFragment(std::move(source), scalar(true), std::move(physical_schema));
 }
 
 Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression) {
-  return std::shared_ptr<FileFragment>(new FileFragment(
-      std::move(source), shared_from_this(), std::move(partition_expression)));
+  return MakeFragment(std::move(source), std::move(partition_expression), nullptr);
+}
+
+Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::shared_ptr<Schema> physical_schema) {
+  return std::shared_ptr<FileFragment>(
+      new FileFragment(std::move(source), shared_from_this(),
+                       std::move(partition_expression), std::move(physical_schema)));
 }
 
 Result<std::shared_ptr<WriteTask>> FileFormat::WriteFragment(
@@ -71,13 +79,13 @@ Result<std::shared_ptr<WriteTask>> FileFormat::WriteFragment(
   return Status::NotImplemented("writing fragment of format ", type_name());
 }
 
-Result<std::shared_ptr<Schema>> FileFragment::ReadPhysicalSchema() {
+Result<std::shared_ptr<Schema>> FileFragment::ReadPhysicalSchemaImpl() {
   return format_->Inspect(source_);
 }
 
 Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanOptions> options,
                                             std::shared_ptr<ScanContext> context) {
-  return format_->ScanFile(source_, std::move(options), std::move(context));
+  return format_->ScanFile(std::move(options), std::move(context), this);
 }
 
 FileSystemDataset::FileSystemDataset(std::shared_ptr<Schema> schema,
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index 466ed0f..d822a8a 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -167,16 +167,22 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
   /// \brief Return the schema of the file if possible.
   virtual Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const = 0;
 
-  /// \brief Open a file for scanning
-  virtual Result<ScanTaskIterator> ScanFile(
-      const FileSource& source, std::shared_ptr<ScanOptions> options,
-      std::shared_ptr<ScanContext> context) const = 0;
+  /// \brief Open a FileFragment for scanning.
+  /// May populate lazy properties of the FileFragment.
+  virtual Result<ScanTaskIterator> ScanFile(std::shared_ptr<ScanOptions> options,
+                                            std::shared_ptr<ScanContext> context,
+                                            FileFragment* file) const = 0;
 
   /// \brief Open a fragment
   virtual Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::shared_ptr<Schema> physical_schema);
+
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression);
 
-  Result<std::shared_ptr<FileFragment>> MakeFragment(FileSource source);
+  Result<std::shared_ptr<FileFragment>> MakeFragment(
+      FileSource source, std::shared_ptr<Schema> physical_schema = NULLPTR);
 
   /// \brief Write a fragment. If the parent directory of destination does not exist, it
   /// will be created.
@@ -189,8 +195,6 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
 /// \brief A Fragment that is stored in a file with a known format
 class ARROW_DS_EXPORT FileFragment : public Fragment {
  public:
-  Result<std::shared_ptr<Schema>> ReadPhysicalSchema() override;
-
   Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
                                 std::shared_ptr<ScanContext> context) override;
 
@@ -202,11 +206,14 @@ class ARROW_DS_EXPORT FileFragment : public Fragment {
 
  protected:
   FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
-               std::shared_ptr<Expression> partition_expression)
-      : Fragment(std::move(partition_expression)),
+               std::shared_ptr<Expression> partition_expression,
+               std::shared_ptr<Schema> physical_schema)
+      : Fragment(std::move(partition_expression), std::move(physical_schema)),
         source_(std::move(source)),
         format_(std::move(format)) {}
 
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
+
   FileSource source_;
   std::shared_ptr<FileFormat> format_;
 
diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc
index c6fbf93..f077ea2 100644
--- a/cpp/src/arrow/dataset/file_csv.cc
+++ b/cpp/src/arrow/dataset/file_csv.cc
@@ -122,12 +122,12 @@ Result<std::shared_ptr<Schema>> CsvFileFormat::Inspect(const FileSource& source)
   return reader->schema();
 }
 
-Result<ScanTaskIterator> CsvFileFormat::ScanFile(
-    const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context) const {
+Result<ScanTaskIterator> CsvFileFormat::ScanFile(std::shared_ptr<ScanOptions> options,
+                                                 std::shared_ptr<ScanContext> context,
+                                                 FileFragment* fragment) const {
   auto this_ = checked_pointer_cast<const CsvFileFormat>(shared_from_this());
-  auto task = std::make_shared<CsvScanTask>(std::move(this_), source, std::move(options),
-                                            std::move(context));
+  auto task = std::make_shared<CsvScanTask>(std::move(this_), fragment->source(),
+                                            std::move(options), std::move(context));
 
   return MakeVectorIterator<std::shared_ptr<ScanTask>>({std::move(task)});
 }
diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h
index 9c2bd08..6777d8a 100644
--- a/cpp/src/arrow/dataset/file_csv.h
+++ b/cpp/src/arrow/dataset/file_csv.h
@@ -45,9 +45,9 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
   Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
 
   /// \brief Open a file for scanning
-  Result<ScanTaskIterator> ScanFile(const FileSource& source,
-                                    std::shared_ptr<ScanOptions> options,
-                                    std::shared_ptr<ScanContext> context) const override;
+  Result<ScanTaskIterator> ScanFile(std::shared_ptr<ScanOptions> options,
+                                    std::shared_ptr<ScanContext> context,
+                                    FileFragment* fragment) const override;
 };
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc
index f080a25..63f3020 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -152,10 +152,11 @@ Result<std::shared_ptr<Schema>> IpcFileFormat::Inspect(const FileSource& source)
   return reader->schema();
 }
 
-Result<ScanTaskIterator> IpcFileFormat::ScanFile(
-    const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context) const {
-  return IpcScanTaskIterator::Make(options, context, source);
+Result<ScanTaskIterator> IpcFileFormat::ScanFile(std::shared_ptr<ScanOptions> options,
+                                                 std::shared_ptr<ScanContext> context,
+                                                 FileFragment* fragment) const {
+  return IpcScanTaskIterator::Make(std::move(options), std::move(context),
+                                   fragment->source());
 }
 
 class IpcWriteTask : public WriteTask {
diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h
index 70db5e9..ad18354 100644
--- a/cpp/src/arrow/dataset/file_ipc.h
+++ b/cpp/src/arrow/dataset/file_ipc.h
@@ -43,9 +43,9 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
   Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
 
   /// \brief Open a file for scanning
-  Result<ScanTaskIterator> ScanFile(const FileSource& source,
-                                    std::shared_ptr<ScanOptions> options,
-                                    std::shared_ptr<ScanContext> context) const override;
+  Result<ScanTaskIterator> ScanFile(std::shared_ptr<ScanOptions> options,
+                                    std::shared_ptr<ScanContext> context,
+                                    FileFragment* fragment) const override;
 
   Result<std::shared_ptr<WriteTask>> WriteFragment(
       WritableFileSource destination, std::shared_ptr<Fragment> fragment,
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index c02f084..ffb6979 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -28,6 +28,7 @@
 #include "arrow/dataset/scanner.h"
 #include "arrow/filesystem/path_util.h"
 #include "arrow/table.h"
+#include "arrow/util/checked_cast.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/range.h"
@@ -38,6 +39,9 @@
 #include "parquet/statistics.h"
 
 namespace arrow {
+
+using internal::checked_cast;
+
 namespace dataset {
 
 using parquet::arrow::SchemaField;
@@ -325,12 +329,6 @@ Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
   return std::move(arrow_reader);
 }
 
-Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
-    const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context) const {
-  return ScanFile(source, std::move(options), std::move(context), {});
-}
-
 static inline bool RowGroupInfosAreComplete(const std::vector<RowGroupInfo>& infos) {
   return !infos.empty() &&
          std::all_of(infos.cbegin(), infos.cend(),
@@ -370,13 +368,20 @@ static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
   return row_groups;
 }
 
-Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
-    const FileSource& source, std::shared_ptr<ScanOptions> options,
-    std::shared_ptr<ScanContext> context, std::vector<RowGroupInfo> row_groups) const {
+Result<ScanTaskIterator> ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions> options,
+                                                     std::shared_ptr<ScanContext> context,
+                                                     FileFragment* fragment) const {
+  const auto& source = fragment->source();
+  auto row_groups = checked_cast<const ParquetFileFragment*>(fragment)->row_groups();
+
   bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
   // The following block is required to avoid any IO if all RowGroups are
   // excluded due to prior statistics knowledge.
   if (row_groups_are_complete) {
+    // physical_schema should be cached at this point
+    ARROW_ASSIGN_OR_RAISE(auto physical_schema, fragment->ReadPhysicalSchema());
+    RETURN_NOT_OK(options->filter->Validate(*physical_schema));
+
     // Apply a pre-filtering if the user requested an explicit sub-set of
     // row-groups. In the case where a RowGroup doesn't have statistics
     // metdata, it will not be excluded.
@@ -402,6 +407,9 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
   if (!row_groups_are_complete) {
     ARROW_ASSIGN_OR_RAISE(row_groups,
                           AugmentRowGroups(std::move(row_groups), reader.get()));
+    std::shared_ptr<Schema> physical_schema;
+    RETURN_NOT_OK(reader->GetSchema(&physical_schema));
+    RETURN_NOT_OK(options->filter->Validate(*physical_schema));
     row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
   }
 
@@ -415,24 +423,18 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
     FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<RowGroupInfo> row_groups) {
-  return std::shared_ptr<FileFragment>(
-      new ParquetFileFragment(std::move(source), shared_from_this(),
-                              std::move(partition_expression), std::move(row_groups)));
-}
-
-Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
-    FileSource source, std::shared_ptr<Expression> partition_expression,
-    std::vector<int> row_groups) {
+    std::vector<RowGroupInfo> row_groups, std::shared_ptr<Schema> physical_schema) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
       std::move(source), shared_from_this(), std::move(partition_expression),
-      RowGroupInfo::FromIdentifiers(row_groups)));
+      std::move(physical_schema), std::move(row_groups)));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
-    FileSource source, std::shared_ptr<Expression> partition_expression) {
+    FileSource source, std::shared_ptr<Expression> partition_expression,
+    std::shared_ptr<Schema> physical_schema) {
   return std::shared_ptr<FileFragment>(new ParquetFileFragment(
-      std::move(source), shared_from_this(), std::move(partition_expression), {}));
+      std::move(source), shared_from_this(), std::move(partition_expression),
+      std::move(physical_schema), {}));
 }
 
 ///
@@ -499,18 +501,14 @@ bool RowGroupInfo::Satisfy(const Expression& predicate) const {
 ParquetFileFragment::ParquetFileFragment(FileSource source,
                                          std::shared_ptr<FileFormat> format,
                                          std::shared_ptr<Expression> partition_expression,
+                                         std::shared_ptr<Schema> physical_schema,
                                          std::vector<RowGroupInfo> row_groups)
-    : FileFragment(std::move(source), std::move(format), std::move(partition_expression)),
+    : FileFragment(std::move(source), std::move(format), std::move(partition_expression),
+                   std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
-      parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)),
+      parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
       has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
 
-Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanOptions> options,
-                                                   std::shared_ptr<ScanContext> context) {
-  return parquet_format_.ScanFile(source_, std::move(options), std::move(context),
-                                  row_groups_);
-}
-
 Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
     const std::shared_ptr<Expression>& predicate) {
   auto simplified_predicate = predicate->Assume(partition_expression());
@@ -648,6 +646,12 @@ Result<std::vector<std::string>> ParquetDatasetFactory::CollectPaths(
   }
 }
 
+Result<std::shared_ptr<Schema>> GetSchema(const parquet::FileMetaData& metadata) {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata.schema(), &schema));
+  return schema;
+}
+
 Result<std::vector<std::shared_ptr<FileFragment>>>
 ParquetDatasetFactory::CollectParquetFragments(
     const parquet::FileMetaData& metadata,
@@ -682,6 +686,7 @@ ParquetDatasetFactory::CollectParquetFragments(
       }
     }
 
+    ARROW_ASSIGN_OR_RAISE(auto physical_schema, GetSchema(metadata));
     std::vector<std::shared_ptr<FileFragment>> fragments;
     fragments.reserve(path_to_row_group_infos.size());
     for (auto&& elem : path_to_row_group_infos) {
@@ -691,7 +696,7 @@ ParquetDatasetFactory::CollectParquetFragments(
               .ValueOr(scalar(true));
       ARROW_ASSIGN_OR_RAISE(
           auto fragment, format_->MakeFragment({path, filesystem_}, std::move(partition),
-                                               std::move(elem.second)));
+                                               std::move(elem.second), physical_schema));
       fragments.push_back(std::move(fragment));
     }
 
@@ -705,9 +710,9 @@ Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchem
     InspectOptions options) {
   std::vector<std::shared_ptr<Schema>> schemas;
 
-  std::shared_ptr<Schema> schema;
-  RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema));
-  schemas.push_back(std::move(schema));
+  // The physical_schema from the _metadata file is always yielded
+  ARROW_ASSIGN_OR_RAISE(auto physical_schema, GetSchema(*metadata_));
+  schemas.push_back(std::move(physical_schema));
 
   if (options_.partitioning.factory() != nullptr) {
     // Gather paths found in RowGroups' ColumnChunks.
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index 182280e..546ceac 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -95,30 +95,22 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
   Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
 
   /// \brief Open a file for scanning
-  Result<ScanTaskIterator> ScanFile(const FileSource& source,
-                                    std::shared_ptr<ScanOptions> options,
-                                    std::shared_ptr<ScanContext> context) const override;
-
-  /// \brief Open a file for scanning, restricted to the specified row groups.
-  Result<ScanTaskIterator> ScanFile(const FileSource& source,
-                                    std::shared_ptr<ScanOptions> options,
+  Result<ScanTaskIterator> ScanFile(std::shared_ptr<ScanOptions> options,
                                     std::shared_ptr<ScanContext> context,
-                                    std::vector<RowGroupInfo> row_groups) const;
+                                    FileFragment* file) const override;
 
   using FileFormat::MakeFragment;
 
   /// \brief Create a Fragment, restricted to the specified row groups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
       FileSource source, std::shared_ptr<Expression> partition_expression,
-      std::vector<RowGroupInfo> row_groups);
-
-  Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression,
-      std::vector<int> row_groups);
+      std::vector<RowGroupInfo> row_groups,
+      std::shared_ptr<Schema> physical_schema = NULLPTR);
 
   /// \brief Create a Fragment targeting all RowGroups.
   Result<std::shared_ptr<FileFragment>> MakeFragment(
-      FileSource source, std::shared_ptr<Expression> partition_expression) override;
+      FileSource source, std::shared_ptr<Expression> partition_expression,
+      std::shared_ptr<Schema> physical_schema) override;
 
   /// \brief Return a FileReader on the given source.
   Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
@@ -196,9 +188,6 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable<RowGroupInf
 /// significant performance boost when scanning high latency file systems.
 class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
  public:
-  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
-                                std::shared_ptr<ScanContext> context) override;
-
   Result<FragmentVector> SplitByRowGroup(const std::shared_ptr<Expression>& predicate);
 
   /// \brief Return the RowGroups selected by this fragment. An empty list
@@ -214,8 +203,12 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
  private:
   ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
                       std::shared_ptr<Expression> partition_expression,
+                      std::shared_ptr<Schema> physical_schema,
                       std::vector<RowGroupInfo> row_groups);
 
+  // TODO(bkietz) override ReadPhysicalSchemaImpl to augment row_groups_
+  // while a reader is opened anyway
+
   std::vector<RowGroupInfo> row_groups_;
   ParquetFileFormat& parquet_format_;
   bool has_complete_metadata_;
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index 8fe6b67..2a0b919 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -501,8 +501,11 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) {
   opts_ = ScanOptions::Make(reader->schema());
 
   auto row_groups_fragment = [&](std::vector<int> row_groups) {
+    std::shared_ptr<Schema> physical_schema = nullptr;
     EXPECT_OK_AND_ASSIGN(auto fragment,
-                         format_->MakeFragment(*source, scalar(true), row_groups));
+                         format_->MakeFragment(*source, scalar(true),
+                                               RowGroupInfo::FromIdentifiers(row_groups),
+                                               physical_schema));
     return internal::checked_pointer_cast<ParquetFileFragment>(fragment);
   };
 
diff --git a/cpp/src/arrow/dataset/filter.cc b/cpp/src/arrow/dataset/filter.cc
index 9a46fc2..780ad29 100644
--- a/cpp/src/arrow/dataset/filter.cc
+++ b/cpp/src/arrow/dataset/filter.cc
@@ -870,6 +870,10 @@ Result<std::shared_ptr<DataType>> NotExpression::Validate(const Schema& schema)
 
 Result<std::shared_ptr<DataType>> InExpression::Validate(const Schema& schema) const {
   ARROW_ASSIGN_OR_RAISE(auto operand_type, operand_->Validate(schema));
+  if (operand_type->id() == Type::NA || set_->type()->id() == Type::NA) {
+    return boolean();
+  }
+
   if (!operand_type->Equals(set_->type())) {
     return Status::TypeError("mismatch: set type ", *set_->type(), " vs operand type ",
                              *operand_type);
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index 26a58fb..76e3c1f 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -203,9 +203,9 @@ class DummyFileFormat : public FileFormat {
   }
 
   /// \brief Open a file for scanning (always returns an empty iterator)
-  Result<ScanTaskIterator> ScanFile(const FileSource& source,
-                                    std::shared_ptr<ScanOptions> options,
-                                    std::shared_ptr<ScanContext> context) const override {
+  Result<ScanTaskIterator> ScanFile(std::shared_ptr<ScanOptions> options,
+                                    std::shared_ptr<ScanContext> context,
+                                    FileFragment* fragment) const override {
     return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
   }
 
@@ -233,16 +233,16 @@ class JSONRecordBatchFileFormat : public FileFormat {
   }
 
   /// \brief Open a file for scanning
-  Result<ScanTaskIterator> ScanFile(const FileSource& source,
-                                    std::shared_ptr<ScanOptions> options,
-                                    std::shared_ptr<ScanContext> context) const override {
-    ARROW_ASSIGN_OR_RAISE(auto file, source.Open());
+  Result<ScanTaskIterator> ScanFile(std::shared_ptr<ScanOptions> options,
+                                    std::shared_ptr<ScanContext> context,
+                                    FileFragment* fragment) const override {
+    ARROW_ASSIGN_OR_RAISE(auto file, fragment->source().Open());
     ARROW_ASSIGN_OR_RAISE(int64_t size, file->GetSize());
     ARROW_ASSIGN_OR_RAISE(auto buffer, file->Read(size));
 
     util::string_view view{*buffer};
 
-    ARROW_ASSIGN_OR_RAISE(auto schema, Inspect(source));
+    ARROW_ASSIGN_OR_RAISE(auto schema, Inspect(fragment->source()));
     std::shared_ptr<RecordBatch> batch = RecordBatchFromJSON(schema, view);
     return ScanTaskIteratorFromRecordBatch({batch}, std::move(options),
                                            std::move(context));
diff --git a/cpp/src/arrow/util/mutex.cc b/cpp/src/arrow/util/mutex.cc
new file mode 100644
index 0000000..fa900fd
--- /dev/null
+++ b/cpp/src/arrow/util/mutex.cc
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/mutex.h"
+
+#include <mutex>
+
+namespace arrow {
+namespace util {
+
+struct Mutex::Impl {
+  std::mutex mutex_;
+};
+
+Mutex::Guard::Guard(Mutex* locked)
+    : locked_(locked, [](Mutex* locked) { locked->impl_->mutex_.unlock(); }) {}
+
+Mutex::Guard Mutex::TryLock() {
+  if (impl_->mutex_.try_lock()) {
+    return Guard{this};
+  }
+  return Guard{};
+}
+
+Mutex::Guard Mutex::Lock() {
+  impl_->mutex_.lock();
+  return Guard{this};
+}
+
+Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {}
+
+}  // namespace util
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/mutex.h b/cpp/src/arrow/util/mutex.h
new file mode 100644
index 0000000..a0365b7
--- /dev/null
+++ b/cpp/src/arrow/util/mutex.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace util {
+
+/// A wrapper around std::mutex since we can't use it directly in
+/// public headers due to C++/CLI.
+/// https://docs.microsoft.com/en-us/cpp/standard-library/mutex#remarks
+class ARROW_EXPORT Mutex {
+ public:
+  Mutex();
+
+  /// A Guard is falsy if a lock could not be acquired.
+  class Guard {
+   public:
+    Guard() : locked_(NULLPTR, [](Mutex* mutex) {}) {}
+    Guard(Guard&&) = default;
+    Guard& operator=(Guard&&) = default;
+
+    explicit operator bool() const { return bool(locked_); }
+
+   private:
+    explicit Guard(Mutex* locked);
+
+    std::unique_ptr<Mutex, void (*)(Mutex*)> locked_;
+    friend Mutex;
+  };
+
+  Guard TryLock();
+  Guard Lock();
+
+ private:
+  struct Impl;
+  std::unique_ptr<Impl, void (*)(Impl*)> impl_;
+};
+
+}  // namespace util
+}  // namespace arrow
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 8047c5f..65d10ba 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -624,7 +624,8 @@ cdef class FileFormat:
         c_source = _make_file_source(file, filesystem)
         c_fragment = <shared_ptr[CFragment]> GetResultValue(
             self.format.MakeFragment(move(c_source),
-                                     partition_expression.unwrap()))
+                                     partition_expression.unwrap(),
+                                     <shared_ptr[CSchema]>nullptr))
         return Fragment.wrap(move(c_fragment))
 
     def __eq__(self, other):
@@ -1025,7 +1026,8 @@ cdef class ParquetFileFormat(FileFormat):
     def make_fragment(self, file, filesystem=None,
                       Expression partition_expression=None, row_groups=None):
         cdef:
-            vector[int] c_row_groups
+            vector[int] c_row_group_ids
+            vector[CRowGroupInfo] c_row_groups
 
         partition_expression = partition_expression or _true
 
@@ -1034,12 +1036,14 @@ cdef class ParquetFileFormat(FileFormat):
                                          partition_expression)
 
         c_source = _make_file_source(file, filesystem)
-        c_row_groups = [<int> row_group for row_group in set(row_groups)]
+        c_row_group_ids = [<int> row_group for row_group in set(row_groups)]
+        c_row_groups = CRowGroupInfo.FromIdentifiers(move(c_row_group_ids))
 
         c_fragment = <shared_ptr[CFragment]> GetResultValue(
             self.parquet_format.MakeFragment(move(c_source),
                                              partition_expression.unwrap(),
-                                             move(c_row_groups)))
+                                             move(c_row_groups),
+                                             <shared_ptr[CSchema]>nullptr))
         return Fragment.wrap(move(c_fragment))
 
 
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 6823bd9..834fa6f 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -206,7 +206,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         CResult[shared_ptr[CSchema]] Inspect(const CFileSource&) const
         CResult[shared_ptr[CFileFragment]] MakeFragment(
             CFileSource source,
-            shared_ptr[CExpression] partition_expression)
+            shared_ptr[CExpression] partition_expression,
+            shared_ptr[CSchema] physical_schema)
 
     cdef cppclass CFileFragment "arrow::dataset::FileFragment"(
             CFragment):
@@ -222,6 +223,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         c_bool HasStatistics() const
         shared_ptr[CStructScalar] statistics() const
 
+        @staticmethod
+        vector[CRowGroupInfo] FromIdentifiers(vector[int])
+
     cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"(
             CFileFragment):
         const vector[CRowGroupInfo]& row_groups() const
@@ -252,7 +256,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         CResult[shared_ptr[CFileFragment]] MakeFragment(
             CFileSource source,
             shared_ptr[CExpression] partition_expression,
-            vector[int] row_groups)
+            vector[CRowGroupInfo] row_groups,
+            shared_ptr[CSchema] physical_schema)
 
     cdef cppclass CIpcFileFormat "arrow::dataset::IpcFileFormat"(
             CFileFormat):
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 2a59f62..738924b 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -124,6 +124,38 @@ def mockfs():
     return mockfs
 
 
+@pytest.fixture
+def open_logging_fs(monkeypatch):
+    from pyarrow.fs import PyFileSystem, LocalFileSystem, _normalize_path
+    from .test_fs import ProxyHandler
+
+    localfs = LocalFileSystem()
+
+    def normalized(paths):
+        return {_normalize_path(localfs, str(p)) for p in paths}
+
+    opened = set()
+
+    def open_input_file(self, path):
+        path = _normalize_path(localfs, str(path))
+        opened.add(path)
+        return self._fs.open_input_file(path)
+
+    # patch proxyhandler to log calls to open_input_file
+    monkeypatch.setattr(ProxyHandler, "open_input_file", open_input_file)
+    fs = PyFileSystem(ProxyHandler(localfs))
+
+    @contextlib.contextmanager
+    def assert_opens(expected_opened):
+        opened.clear()
+        try:
+            yield
+        finally:
+            assert normalized(opened) == normalized(expected_opened)
+
+    return fs, assert_opens
+
+
 @pytest.fixture(scope='module')
 def multisourcefs(request):
     request.config.pyarrow.requires('pandas')
@@ -1625,6 +1657,42 @@ def test_parquet_dataset_factory_partitioned(tempdir):
 
 @pytest.mark.parquet
 @pytest.mark.pandas
+def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs):
+    fs, assert_opens = open_logging_fs
+
+    # Test to ensure that no IO happens when filtering a dataset
+    # created with ParquetDatasetFactory from a _metadata file
+
+    root_path = tempdir / "test_parquet_dataset_lazy_filtering"
+    metadata_path, _ = _create_parquet_dataset_simple(root_path)
+
+    # creating the dataset should only open the metadata file
+    with assert_opens([metadata_path]):
+        dataset = ds.parquet_dataset(
+            metadata_path,
+            partitioning=ds.partitioning(flavor="hive"),
+            filesystem=fs)
+
+    # materializing fragments should not open any file
+    with assert_opens([]):
+        fragments = list(dataset.get_fragments())
+
+    # filtering fragments should not open any file
+    with assert_opens([]):
+        list(dataset.get_fragments(ds.field("f1") > 15))
+
+    # splitting by row group should still not open any file
+    with assert_opens([]):
+        fragments[0].split_by_row_group(ds.field("f1") > 15)
+
+    # FIXME(bkietz) on Windows this results in FileNotFoundErrors.
+    # but actually scanning does open files
+    # with assert_opens([f.path for f in fragments]):
+    #    dataset.to_table()
+
+
+@pytest.mark.parquet
+@pytest.mark.pandas
 def test_dataset_schema_metadata(tempdir):
     # ARROW-8802
     df = pd.DataFrame({'a': [1, 2, 3]})
@@ -1639,3 +1707,25 @@ def test_dataset_schema_metadata(tempdir):
     assert b"pandas" in schema.metadata
     # ensure it is still there in a projected schema (with column selection)
     assert schema.equals(projected_schema, check_metadata=True)
+
+
+@pytest.mark.parquet
+def test_filter_mismatching_schema(tempdir):
+    # ARROW-9146
+    import pyarrow.parquet as pq
+
+    table = pa.table({"col": pa.array([1, 2, 3, 4], type='int32')})
+    pq.write_table(table, str(tempdir / "data.parquet"))
+
+    # specifying explicit schema, but that mismatches the schema of the data
+    schema = pa.schema([("col", pa.int64())])
+    dataset = ds.dataset(
+        tempdir / "data.parquet", format="parquet", schema=schema)
+
+    # filtering on a column with such type mismatch should give a proper error
+    with pytest.raises(TypeError):
+        dataset.to_table(filter=ds.field("col") > 2)
+
+    fragment = list(dataset.get_fragments())[0]
+    with pytest.raises(TypeError):
+        fragment.to_table(filter=ds.field("col") > 2, schema=schema)