You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/07/08 14:53:17 UTC

[arrow] branch master updated: ARROW-16910: [C++] Add Equals method for FileFragment (#13490)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8116998fe0 ARROW-16910: [C++] Add Equals method for FileFragment (#13490)
8116998fe0 is described below

commit 8116998fe00b0e5d3998577bf442a10b19c66b3d
Author: Vibhatha Lakmal Abeykoon <vi...@users.noreply.github.com>
AuthorDate: Fri Jul 8 20:23:09 2022 +0530

    ARROW-16910: [C++] Add Equals method for FileFragment (#13490)
    
    Adding Equals method for `FileFragment`
    
    Authored-by: Vibhatha Abeykoon <vi...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 cpp/src/arrow/dataset/file_base.cc         | 12 ++++++++++++
 cpp/src/arrow/dataset/file_base.h          | 10 ++++++++--
 cpp/src/arrow/dataset/file_csv_test.cc     |  2 ++
 cpp/src/arrow/dataset/file_ipc_test.cc     |  1 +
 cpp/src/arrow/dataset/file_orc_test.cc     |  1 +
 cpp/src/arrow/dataset/file_parquet_test.cc |  2 ++
 cpp/src/arrow/dataset/test_util.h          | 14 ++++++++++++++
 7 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 568ab41451..f50bbe1e0c 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -89,6 +89,14 @@ Result<std::shared_ptr<io::InputStream>> FileSource::OpenCompressed(
   return io::CompressedInputStream::Make(codec.get(), std::move(file));
 }
 
+bool FileSource::Equals(const FileSource& other) const {
+  bool match_file_system =
+      (filesystem_ == nullptr && other.filesystem_ == nullptr) ||
+      (filesystem_ && other.filesystem_ && filesystem_->Equals(other.filesystem_));
+  return match_file_system && file_info_.Equals(other.file_info_) &&
+         buffer_->Equals(*other.buffer_) && compression_ == other.compression_;
+}
+
 Future<util::optional<int64_t>> FileFormat::CountRows(
     const std::shared_ptr<FileFragment>&, compute::Expression,
     const std::shared_ptr<ScanOptions>&) {
@@ -135,6 +143,10 @@ Future<util::optional<int64_t>> FileFragment::CountRows(
   return format()->CountRows(self, std::move(predicate), options);
 }
 
+bool FileFragment::Equals(const FileFragment& other) const {
+  return source_.Equals(other.source_) && format_->Equals(*other.format_);
+}
+
 struct FileSystemDataset::FragmentSubtrees {
   // Forest for skipping fragments based on extracted subtree expressions
   compute::Forest forest;
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index 9eb3182c2f..7b0f5ffcf2 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -46,7 +46,7 @@ namespace dataset {
 
 /// \brief The path and filesystem where an actual file is located or a buffer which can
 /// be read like a file
-class ARROW_DS_EXPORT FileSource {
+class ARROW_DS_EXPORT FileSource : public util::EqualityComparable<FileSource> {
  public:
   FileSource(std::string path, std::shared_ptr<fs::FileSystem> filesystem,
              Compression::type compression = Compression::UNCOMPRESSED)
@@ -114,6 +114,9 @@ class ARROW_DS_EXPORT FileSource {
   Result<std::shared_ptr<io::InputStream>> OpenCompressed(
       util::optional<Compression::type> compression = util::nullopt) const;
 
+  /// \brief equality comparison with another FileSource
+  bool Equals(const FileSource& other) const;
+
  private:
   static Result<std::shared_ptr<io::RandomAccessFile>> InvalidOpen() {
     return Status::Invalid("Called Open() on an uninitialized FileSource");
@@ -179,7 +182,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
 };
 
 /// \brief A Fragment that is stored in a file with a known format
-class ARROW_DS_EXPORT FileFragment : public Fragment {
+class ARROW_DS_EXPORT FileFragment : public Fragment,
+                                     public util::EqualityComparable<FileFragment> {
  public:
   Result<RecordBatchGenerator> ScanBatchesAsync(
       const std::shared_ptr<ScanOptions>& options) override;
@@ -193,6 +197,8 @@ class ARROW_DS_EXPORT FileFragment : public Fragment {
   const FileSource& source() const { return source_; }
   const std::shared_ptr<FileFormat>& format() const { return format_; }
 
+  bool Equals(const FileFragment& other) const;
+
  protected:
   FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
                compute::Expression partition_expression,
diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc
index 2064c58148..76d2153cf2 100644
--- a/cpp/src/arrow/dataset/file_csv_test.cc
+++ b/cpp/src/arrow/dataset/file_csv_test.cc
@@ -359,6 +359,8 @@ TEST_P(TestCsvFileFormat, WriteRecordBatchReaderCustomOptions) {
 
 TEST_P(TestCsvFileFormat, CountRows) { TestCountRows(); }
 
+TEST_P(TestCsvFileFormat, FragmentEquals) { TestFragmentEquals(); }
+
 INSTANTIATE_TEST_SUITE_P(TestUncompressedCsv, TestCsvFileFormat,
                          ::testing::Values(Compression::UNCOMPRESSED));
 #ifdef ARROW_WITH_BZ2
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc
index 35a2ef273f..3293024533 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -89,6 +89,7 @@ TEST_F(TestIpcFileFormat, InspectFailureWithRelevantError) {
 TEST_F(TestIpcFileFormat, Inspect) { TestInspect(); }
 TEST_F(TestIpcFileFormat, IsSupported) { TestIsSupported(); }
 TEST_F(TestIpcFileFormat, CountRows) { TestCountRows(); }
+TEST_F(TestIpcFileFormat, FragmentEquals) { TestFragmentEquals(); }
 
 class TestIpcFileSystemDataset : public testing::Test,
                                  public WriteFileSystemDatasetMixin {
diff --git a/cpp/src/arrow/dataset/file_orc_test.cc b/cpp/src/arrow/dataset/file_orc_test.cc
index aaa3aeff94..588878ce79 100644
--- a/cpp/src/arrow/dataset/file_orc_test.cc
+++ b/cpp/src/arrow/dataset/file_orc_test.cc
@@ -62,6 +62,7 @@ TEST_F(TestOrcFileFormat, InspectFailureWithRelevantError) {
 TEST_F(TestOrcFileFormat, Inspect) { TestInspect(); }
 TEST_F(TestOrcFileFormat, IsSupported) { TestIsSupported(); }
 TEST_F(TestOrcFileFormat, CountRows) { TestCountRows(); }
+TEST_F(TestOrcFileFormat, FragmentEquals) { TestFragmentEquals(); }
 
 // TODO add TestOrcFileSystemDataset if write support is added
 
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index 2c7cad8a7e..de048855cf 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -218,6 +218,8 @@ TEST_F(TestParquetFileFormat, WriteRecordBatchReaderCustomOptions) {
 
 TEST_F(TestParquetFileFormat, CountRows) { TestCountRows(); }
 
+TEST_F(TestParquetFileFormat, FragmentEquals) { TestFragmentEquals(); }
+
 TEST_F(TestParquetFileFormat, CountRowsPredicatePushdown) {
   constexpr int64_t kNumRowGroups = 16;
   constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index b7fc66e2ae..0940974515 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -555,6 +555,20 @@ class FileFormatFixtureMixin : public ::testing::Test {
     ASSERT_OK_AND_ASSIGN(predicate, predicate.Bind(*full_schema));
     ASSERT_FINISHES_OK_AND_EQ(util::nullopt, fragment->CountRows(predicate, options));
   }
+  void TestFragmentEquals() {
+    auto options = std::make_shared<ScanOptions>();
+    auto this_schema = schema({field("f64", float64())});
+    auto other_schema = schema({field("f32", float32())});
+    auto reader = this->GetRecordBatchReader(this_schema);
+    auto other_reader = this->GetRecordBatchReader(other_schema);
+    auto source = this->GetFileSource(reader.get());
+    auto other_source = this->GetFileSource(other_reader.get());
+
+    auto fragment = this->MakeFragment(*source);
+    EXPECT_TRUE(fragment->Equals(*fragment));
+    auto other = this->MakeFragment(*other_source);
+    EXPECT_FALSE(fragment->Equals(*other));
+  }
 
  protected:
   std::shared_ptr<typename FormatHelper::FormatType> format_;