You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/07/08 14:53:17 UTC
[arrow] branch master updated: ARROW-16910: [C++] Add Equals method for FileFragment (#13490)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 8116998fe0 ARROW-16910: [C++] Add Equals method for FileFragment (#13490)
8116998fe0 is described below
commit 8116998fe00b0e5d3998577bf442a10b19c66b3d
Author: Vibhatha Lakmal Abeykoon <vi...@users.noreply.github.com>
AuthorDate: Fri Jul 8 20:23:09 2022 +0530
ARROW-16910: [C++] Add Equals method for FileFragment (#13490)
Adding Equals method for `FileFragment`
Authored-by: Vibhatha Abeykoon <vi...@gmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
cpp/src/arrow/dataset/file_base.cc | 12 ++++++++++++
cpp/src/arrow/dataset/file_base.h | 10 ++++++++--
cpp/src/arrow/dataset/file_csv_test.cc | 2 ++
cpp/src/arrow/dataset/file_ipc_test.cc | 1 +
cpp/src/arrow/dataset/file_orc_test.cc | 1 +
cpp/src/arrow/dataset/file_parquet_test.cc | 2 ++
cpp/src/arrow/dataset/test_util.h | 14 ++++++++++++++
7 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 568ab41451..f50bbe1e0c 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -89,6 +89,14 @@ Result<std::shared_ptr<io::InputStream>> FileSource::OpenCompressed(
return io::CompressedInputStream::Make(codec.get(), std::move(file));
}
+bool FileSource::Equals(const FileSource& other) const {
+ bool match_file_system =
+ (filesystem_ == nullptr && other.filesystem_ == nullptr) ||
+ (filesystem_ && other.filesystem_ && filesystem_->Equals(other.filesystem_));
+ return match_file_system && file_info_.Equals(other.file_info_) &&
+ buffer_->Equals(*other.buffer_) && compression_ == other.compression_;
+}
+
Future<util::optional<int64_t>> FileFormat::CountRows(
const std::shared_ptr<FileFragment>&, compute::Expression,
const std::shared_ptr<ScanOptions>&) {
@@ -135,6 +143,10 @@ Future<util::optional<int64_t>> FileFragment::CountRows(
return format()->CountRows(self, std::move(predicate), options);
}
+bool FileFragment::Equals(const FileFragment& other) const {
+ return source_.Equals(other.source_) && format_->Equals(*other.format_);
+}
+
struct FileSystemDataset::FragmentSubtrees {
// Forest for skipping fragments based on extracted subtree expressions
compute::Forest forest;
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index 9eb3182c2f..7b0f5ffcf2 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -46,7 +46,7 @@ namespace dataset {
/// \brief The path and filesystem where an actual file is located or a buffer which can
/// be read like a file
-class ARROW_DS_EXPORT FileSource {
+class ARROW_DS_EXPORT FileSource : public util::EqualityComparable<FileSource> {
public:
FileSource(std::string path, std::shared_ptr<fs::FileSystem> filesystem,
Compression::type compression = Compression::UNCOMPRESSED)
@@ -114,6 +114,9 @@ class ARROW_DS_EXPORT FileSource {
Result<std::shared_ptr<io::InputStream>> OpenCompressed(
util::optional<Compression::type> compression = util::nullopt) const;
+ /// \brief equality comparison with another FileSource
+ bool Equals(const FileSource& other) const;
+
private:
static Result<std::shared_ptr<io::RandomAccessFile>> InvalidOpen() {
return Status::Invalid("Called Open() on an uninitialized FileSource");
@@ -179,7 +182,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
};
/// \brief A Fragment that is stored in a file with a known format
-class ARROW_DS_EXPORT FileFragment : public Fragment {
+class ARROW_DS_EXPORT FileFragment : public Fragment,
+ public util::EqualityComparable<FileFragment> {
public:
Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) override;
@@ -193,6 +197,8 @@ class ARROW_DS_EXPORT FileFragment : public Fragment {
const FileSource& source() const { return source_; }
const std::shared_ptr<FileFormat>& format() const { return format_; }
+ bool Equals(const FileFragment& other) const;
+
protected:
FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc
index 2064c58148..76d2153cf2 100644
--- a/cpp/src/arrow/dataset/file_csv_test.cc
+++ b/cpp/src/arrow/dataset/file_csv_test.cc
@@ -359,6 +359,8 @@ TEST_P(TestCsvFileFormat, WriteRecordBatchReaderCustomOptions) {
TEST_P(TestCsvFileFormat, CountRows) { TestCountRows(); }
+TEST_P(TestCsvFileFormat, FragmentEquals) { TestFragmentEquals(); }
+
INSTANTIATE_TEST_SUITE_P(TestUncompressedCsv, TestCsvFileFormat,
::testing::Values(Compression::UNCOMPRESSED));
#ifdef ARROW_WITH_BZ2
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc
index 35a2ef273f..3293024533 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -89,6 +89,7 @@ TEST_F(TestIpcFileFormat, InspectFailureWithRelevantError) {
TEST_F(TestIpcFileFormat, Inspect) { TestInspect(); }
TEST_F(TestIpcFileFormat, IsSupported) { TestIsSupported(); }
TEST_F(TestIpcFileFormat, CountRows) { TestCountRows(); }
+TEST_F(TestIpcFileFormat, FragmentEquals) { TestFragmentEquals(); }
class TestIpcFileSystemDataset : public testing::Test,
public WriteFileSystemDatasetMixin {
diff --git a/cpp/src/arrow/dataset/file_orc_test.cc b/cpp/src/arrow/dataset/file_orc_test.cc
index aaa3aeff94..588878ce79 100644
--- a/cpp/src/arrow/dataset/file_orc_test.cc
+++ b/cpp/src/arrow/dataset/file_orc_test.cc
@@ -62,6 +62,7 @@ TEST_F(TestOrcFileFormat, InspectFailureWithRelevantError) {
TEST_F(TestOrcFileFormat, Inspect) { TestInspect(); }
TEST_F(TestOrcFileFormat, IsSupported) { TestIsSupported(); }
TEST_F(TestOrcFileFormat, CountRows) { TestCountRows(); }
+TEST_F(TestOrcFileFormat, FragmentEquals) { TestFragmentEquals(); }
// TODO add TestOrcFileSystemDataset if write support is added
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index 2c7cad8a7e..de048855cf 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -218,6 +218,8 @@ TEST_F(TestParquetFileFormat, WriteRecordBatchReaderCustomOptions) {
TEST_F(TestParquetFileFormat, CountRows) { TestCountRows(); }
+TEST_F(TestParquetFileFormat, FragmentEquals) { TestFragmentEquals(); }
+
TEST_F(TestParquetFileFormat, CountRowsPredicatePushdown) {
constexpr int64_t kNumRowGroups = 16;
constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index b7fc66e2ae..0940974515 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -555,6 +555,20 @@ class FileFormatFixtureMixin : public ::testing::Test {
ASSERT_OK_AND_ASSIGN(predicate, predicate.Bind(*full_schema));
ASSERT_FINISHES_OK_AND_EQ(util::nullopt, fragment->CountRows(predicate, options));
}
+ void TestFragmentEquals() {
+ auto options = std::make_shared<ScanOptions>();
+ auto this_schema = schema({field("f64", float64())});
+ auto other_schema = schema({field("f32", float32())});
+ auto reader = this->GetRecordBatchReader(this_schema);
+ auto other_reader = this->GetRecordBatchReader(other_schema);
+ auto source = this->GetFileSource(reader.get());
+ auto other_source = this->GetFileSource(other_reader.get());
+
+ auto fragment = this->MakeFragment(*source);
+ EXPECT_TRUE(fragment->Equals(*fragment));
+ auto other = this->MakeFragment(*other_source);
+ EXPECT_FALSE(fragment->Equals(*other));
+ }
protected:
std::shared_ptr<typename FormatHelper::FormatType> format_;