You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2021/07/17 02:36:29 UTC
[arrow] branch master updated: ARROW-13153: [C++] `parquet_dataset`
loses ordering of files in `_metadata`
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new efb338d ARROW-13153: [C++] `parquet_dataset` loses ordering of files in `_metadata`
efb338d is described below
commit efb338d7ac45602b4c4a5663923c0f42224c3331
Author: Weston Pace <we...@gmail.com>
AuthorDate: Fri Jul 16 16:35:03 2021 -1000
ARROW-13153: [C++] `parquet_dataset` loses ordering of files in `_metadata`
The ParquetDatasetFactory now stores the paths in a list alongside the path-to-id map so that it can create the dataset with a properly ordered set of paths. I added a test to test_dataset.py to confirm this.
Note: While this does fix the issue, writing a dataset is still non-deterministic. This is probably inevitable if partitioning is present. Grouping rows by a partition will destroy any ordering that previously existed. Furthermore, since the current implementation writes with multiple threads, there is no predictable order in which output files are created.
Closes #10636 from westonpace/bugfix/ARROW-13153--parquet_dataset-loses-ordering-of-files-in-_met
Authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/dataset/file_parquet.cc | 21 +++++++++++++--------
cpp/src/arrow/dataset/file_parquet.h | 6 +++---
python/pyarrow/tests/test_dataset.py | 22 ++++++++++++++++++++++
3 files changed, 38 insertions(+), 11 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 30ebc30..1228945 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -867,7 +867,8 @@ Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
ARROW_ASSIGN_OR_RAISE(auto physical_schema, GetSchema(*metadata, properties));
ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(*metadata, properties));
- std::unordered_map<std::string, std::vector<int>> path_to_row_group_ids;
+ std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids;
+ std::unordered_map<std::string, int> paths_to_index;
for (int i = 0; i < metadata->num_row_groups(); i++) {
auto row_group = metadata->RowGroup(i);
@@ -877,22 +878,26 @@ Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
// Insert the path, or increase the count of row groups. It will be assumed that the
// RowGroup of a file are ordered exactly as in the metadata file.
- auto row_groups = &path_to_row_group_ids.insert({std::move(path), {}}).first->second;
- row_groups->emplace_back(i);
+ auto inserted_index = paths_to_index.emplace(
+ std::move(path), static_cast<int>(paths_with_row_group_ids.size()));
+ if (inserted_index.second) {
+ paths_with_row_group_ids.push_back({inserted_index.first->first, {}});
+ }
+ paths_with_row_group_ids[inserted_index.first->second].second.push_back(i);
}
return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
std::move(filesystem), std::move(format), std::move(metadata), std::move(manifest),
std::move(physical_schema), base_path, std::move(options),
- std::move(path_to_row_group_ids)));
+ std::move(paths_with_row_group_ids)));
}
Result<std::vector<std::shared_ptr<FileFragment>>>
ParquetDatasetFactory::CollectParquetFragments(const Partitioning& partitioning) {
- std::vector<std::shared_ptr<FileFragment>> fragments(path_to_row_group_ids_.size());
+ std::vector<std::shared_ptr<FileFragment>> fragments(paths_with_row_group_ids_.size());
size_t i = 0;
- for (const auto& e : path_to_row_group_ids_) {
+ for (const auto& e : paths_with_row_group_ids_) {
const auto& path = e.first;
auto metadata_subset = metadata_->Subset(e.second);
@@ -921,10 +926,10 @@ Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchem
if (auto factory = options_.partitioning.factory()) {
// Gather paths found in RowGroups' ColumnChunks.
- std::vector<std::string> stripped(path_to_row_group_ids_.size());
+ std::vector<std::string> stripped(paths_with_row_group_ids_.size());
size_t i = 0;
- for (const auto& e : path_to_row_group_ids_) {
+ for (const auto& e : paths_with_row_group_ids_) {
stripped[i++] = StripPrefixAndFilename(e.first, options_.partition_base_dir);
}
ARROW_ASSIGN_OR_RAISE(auto partition_schema, factory->Inspect(stripped));
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index da4fd58..d617309 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -352,7 +352,7 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
std::shared_ptr<parquet::arrow::SchemaManifest> manifest,
std::shared_ptr<Schema> physical_schema, std::string base_path,
ParquetFactoryOptions options,
- std::unordered_map<std::string, std::vector<int>> path_to_row_group_ids)
+ std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids)
: filesystem_(std::move(filesystem)),
format_(std::move(format)),
metadata_(std::move(metadata)),
@@ -360,7 +360,7 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
physical_schema_(std::move(physical_schema)),
base_path_(std::move(base_path)),
options_(std::move(options)),
- path_to_row_group_ids_(std::move(path_to_row_group_ids)) {}
+ paths_with_row_group_ids_(std::move(paths_with_row_group_ids)) {}
std::shared_ptr<fs::FileSystem> filesystem_;
std::shared_ptr<ParquetFileFormat> format_;
@@ -369,7 +369,7 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
std::shared_ptr<Schema> physical_schema_;
std::string base_path_;
ParquetFactoryOptions options_;
- std::unordered_map<std::string, std::vector<int>> path_to_row_group_ids_;
+ std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids_;
private:
Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index bf60b0f..6f96624 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -2782,6 +2782,28 @@ def test_parquet_dataset_factory_roundtrip(tempdir, use_legacy_dataset):
assert result.num_rows == 10
+def test_parquet_dataset_factory_order(tempdir):
+ # The order of the fragments in the dataset should match the order of the
+ # row groups in the _metadata file.
+ import pyarrow.parquet as pq
+ metadatas = []
+ # Create a dataset where f1 is incrementing from 0 to 100 spread across
+ # 10 files. Put the row groups in the correct order in _metadata
+ for i in range(10):
+ table = pa.table(
+ {'f1': list(range(i*10, (i+1)*10))})
+ table_path = tempdir / f'{i}.parquet'
+ pq.write_table(table, table_path, metadata_collector=metadatas)
+ metadatas[-1].set_file_path(f'{i}.parquet')
+ metadata_path = str(tempdir / '_metadata')
+ pq.write_metadata(table.schema, metadata_path, metadatas)
+ dataset = ds.parquet_dataset(metadata_path)
+ # Ensure the table contains values from 0-100 in the right order
+ scanned_table = dataset.to_table()
+ scanned_col = scanned_table.column('f1').to_pylist()
+ assert scanned_col == list(range(0, 100))
+
+
@pytest.mark.parquet
@pytest.mark.pandas
def test_parquet_dataset_factory_invalid(tempdir):