You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2021/07/17 02:36:29 UTC

[arrow] branch master updated: ARROW-13153: [C++] `parquet_dataset` loses ordering of files in `_metadata`

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new efb338d  ARROW-13153: [C++] `parquet_dataset` loses ordering of files in `_metadata`
efb338d is described below

commit efb338d7ac45602b4c4a5663923c0f42224c3331
Author: Weston Pace <we...@gmail.com>
AuthorDate: Fri Jul 16 16:35:03 2021 -1000

    ARROW-13153: [C++] `parquet_dataset` loses ordering of files in `_metadata`
    
    The ParquetDatasetFactory now stores the paths in a list alongside the path-to-id map so that it can create the dataset with a properly ordered set of paths.  I added a test to test_dataset.py to confirm this.
    
    Note: While this does fix the issue, writing a dataset is still non-deterministic.  This is probably inevitable if partitioning is present.  Grouping rows by a partition will destroy any ordering that previously existed.  Furthermore, since the current implementation writes with multiple threads, there is no predictable order in which output files are created.
    
    Closes #10636 from westonpace/bugfix/ARROW-13153--parquet_dataset-loses-ordering-of-files-in-_met
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/dataset/file_parquet.cc | 21 +++++++++++++--------
 cpp/src/arrow/dataset/file_parquet.h  |  6 +++---
 python/pyarrow/tests/test_dataset.py  | 22 ++++++++++++++++++++++
 3 files changed, 38 insertions(+), 11 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 30ebc30..1228945 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -867,7 +867,8 @@ Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
   ARROW_ASSIGN_OR_RAISE(auto physical_schema, GetSchema(*metadata, properties));
   ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(*metadata, properties));
 
-  std::unordered_map<std::string, std::vector<int>> path_to_row_group_ids;
+  std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids;
+  std::unordered_map<std::string, int> paths_to_index;
 
   for (int i = 0; i < metadata->num_row_groups(); i++) {
     auto row_group = metadata->RowGroup(i);
@@ -877,22 +878,26 @@ Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
 
     // Insert the path, or increase the count of row groups. It will be assumed that the
     // RowGroup of a file are ordered exactly as in the metadata file.
-    auto row_groups = &path_to_row_group_ids.insert({std::move(path), {}}).first->second;
-    row_groups->emplace_back(i);
+    auto inserted_index = paths_to_index.emplace(
+        std::move(path), static_cast<int>(paths_with_row_group_ids.size()));
+    if (inserted_index.second) {
+      paths_with_row_group_ids.push_back({inserted_index.first->first, {}});
+    }
+    paths_with_row_group_ids[inserted_index.first->second].second.push_back(i);
   }
 
   return std::shared_ptr<DatasetFactory>(new ParquetDatasetFactory(
       std::move(filesystem), std::move(format), std::move(metadata), std::move(manifest),
       std::move(physical_schema), base_path, std::move(options),
-      std::move(path_to_row_group_ids)));
+      std::move(paths_with_row_group_ids)));
 }
 
 Result<std::vector<std::shared_ptr<FileFragment>>>
 ParquetDatasetFactory::CollectParquetFragments(const Partitioning& partitioning) {
-  std::vector<std::shared_ptr<FileFragment>> fragments(path_to_row_group_ids_.size());
+  std::vector<std::shared_ptr<FileFragment>> fragments(paths_with_row_group_ids_.size());
 
   size_t i = 0;
-  for (const auto& e : path_to_row_group_ids_) {
+  for (const auto& e : paths_with_row_group_ids_) {
     const auto& path = e.first;
     auto metadata_subset = metadata_->Subset(e.second);
 
@@ -921,10 +926,10 @@ Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchem
 
   if (auto factory = options_.partitioning.factory()) {
     // Gather paths found in RowGroups' ColumnChunks.
-    std::vector<std::string> stripped(path_to_row_group_ids_.size());
+    std::vector<std::string> stripped(paths_with_row_group_ids_.size());
 
     size_t i = 0;
-    for (const auto& e : path_to_row_group_ids_) {
+    for (const auto& e : paths_with_row_group_ids_) {
       stripped[i++] = StripPrefixAndFilename(e.first, options_.partition_base_dir);
     }
     ARROW_ASSIGN_OR_RAISE(auto partition_schema, factory->Inspect(stripped));
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index da4fd58..d617309 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -352,7 +352,7 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
       std::shared_ptr<parquet::arrow::SchemaManifest> manifest,
       std::shared_ptr<Schema> physical_schema, std::string base_path,
       ParquetFactoryOptions options,
-      std::unordered_map<std::string, std::vector<int>> path_to_row_group_ids)
+      std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids)
       : filesystem_(std::move(filesystem)),
         format_(std::move(format)),
         metadata_(std::move(metadata)),
@@ -360,7 +360,7 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
         physical_schema_(std::move(physical_schema)),
         base_path_(std::move(base_path)),
         options_(std::move(options)),
-        path_to_row_group_ids_(std::move(path_to_row_group_ids)) {}
+        paths_with_row_group_ids_(std::move(paths_with_row_group_ids)) {}
 
   std::shared_ptr<fs::FileSystem> filesystem_;
   std::shared_ptr<ParquetFileFormat> format_;
@@ -369,7 +369,7 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
   std::shared_ptr<Schema> physical_schema_;
   std::string base_path_;
   ParquetFactoryOptions options_;
-  std::unordered_map<std::string, std::vector<int>> path_to_row_group_ids_;
+  std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids_;
 
  private:
   Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index bf60b0f..6f96624 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -2782,6 +2782,28 @@ def test_parquet_dataset_factory_roundtrip(tempdir, use_legacy_dataset):
     assert result.num_rows == 10
 
 
+def test_parquet_dataset_factory_order(tempdir):
+    # The order of the fragments in the dataset should match the order of the
+    # row groups in the _metadata file.
+    import pyarrow.parquet as pq
+    metadatas = []
+    # Create a dataset where f1 is incrementing from 0 to 100 spread across
+    # 10 files.  Put the row groups in the correct order in _metadata
+    for i in range(10):
+        table = pa.table(
+            {'f1': list(range(i*10, (i+1)*10))})
+        table_path = tempdir / f'{i}.parquet'
+        pq.write_table(table, table_path, metadata_collector=metadatas)
+        metadatas[-1].set_file_path(f'{i}.parquet')
+    metadata_path = str(tempdir / '_metadata')
+    pq.write_metadata(table.schema, metadata_path, metadatas)
+    dataset = ds.parquet_dataset(metadata_path)
+    # Ensure the table contains values from 0-100 in the right order
+    scanned_table = dataset.to_table()
+    scanned_col = scanned_table.column('f1').to_pylist()
+    assert scanned_col == list(range(0, 100))
+
+
 @pytest.mark.parquet
 @pytest.mark.pandas
 def test_parquet_dataset_factory_invalid(tempdir):