You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/05/27 00:08:46 UTC

[arrow] branch master updated: ARROW-16302: [C++] Null values in partitioning field for FilenamePartitioning

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new adb5b00023 ARROW-16302: [C++] Null values in partitioning field for FilenamePartitioning
adb5b00023 is described below

commit adb5b00023bf57122c6de540746008eff2d1ad92
Author: Sanjiban Sengupta <sa...@gmail.com>
AuthorDate: Thu May 26 14:08:08 2022 -1000

    ARROW-16302: [C++] Null values in partitioning field for FilenamePartitioning
    
    This PR fixes the issue of the partitioning field having null values while using FilenamePartitioning.
    For FilenamePartitioning, we should only remove the prefix and thus should not use `StripPrefixAndFilename()`, which will remove the filename too along with the prefix.
    
    Closes #12977 from sanjibansg/fix-FilenamePartitioning
    
    Authored-by: Sanjiban Sengupta <sa...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/dataset/discovery.cc      |   2 +-
 cpp/src/arrow/dataset/file_base.cc      |  20 ++---
 cpp/src/arrow/dataset/file_parquet.cc   |   2 +-
 cpp/src/arrow/dataset/partition.cc      |  32 ++++----
 cpp/src/arrow/dataset/partition.h       |  16 ++--
 cpp/src/arrow/dataset/partition_test.cc | 133 ++++++++++++++++++--------------
 cpp/src/arrow/filesystem/path_util.cc   |   8 +-
 python/pyarrow/dataset.py               |   1 -
 python/pyarrow/tests/test_dataset.py    |  28 +++++--
 9 files changed, 143 insertions(+), 99 deletions(-)

diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc
index f8382aa405..25fa7ff2b7 100644
--- a/cpp/src/arrow/dataset/discovery.cc
+++ b/cpp/src/arrow/dataset/discovery.cc
@@ -279,7 +279,7 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions
 
   std::vector<std::shared_ptr<FileFragment>> fragments;
   for (const auto& info : files_) {
-    auto fixed_path = StripPrefixAndFilename(info.path(), options_.partition_base_dir);
+    auto fixed_path = StripPrefix(info.path(), options_.partition_base_dir);
     ARROW_ASSIGN_OR_RAISE(auto partition, partitioning->Parse(fixed_path));
     ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({info, fs_}, partition));
     fragments.push_back(fragment);
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 1027781057..2e05706bbb 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -270,11 +270,11 @@ Future<> FileWriter::Finish() {
 
 namespace {
 
-Status WriteBatch(std::shared_ptr<RecordBatch> batch, compute::Expression guarantee,
-                  FileSystemDatasetWriteOptions write_options,
-                  std::function<Status(std::shared_ptr<RecordBatch>,
-                                       const Partitioning::PartitionPathFormat&)>
-                      write) {
+Status WriteBatch(
+    std::shared_ptr<RecordBatch> batch, compute::Expression guarantee,
+    FileSystemDatasetWriteOptions write_options,
+    std::function<Status(std::shared_ptr<RecordBatch>, const PartitionPathFormat&)>
+        write) {
   ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch));
   batch.reset();  // drop to hopefully conserve memory
 
@@ -292,7 +292,7 @@ Status WriteBatch(std::shared_ptr<RecordBatch> batch, compute::Expression guaran
   for (std::size_t index = 0; index < groups.batches.size(); index++) {
     auto partition_expression = and_(groups.expressions[index], guarantee);
     auto next_batch = groups.batches[index];
-    Partitioning::PartitionPathFormat destination;
+    PartitionPathFormat destination;
     ARROW_ASSIGN_OR_RAISE(destination,
                           write_options.partitioning->Format(partition_expression));
     RETURN_NOT_OK(write(next_batch, destination));
@@ -337,10 +337,10 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer {
     return WriteBatch(
         batch, guarantee, write_options_,
         [this](std::shared_ptr<RecordBatch> next_batch,
-               const Partitioning::PartitionPathFormat& destination) {
+               const PartitionPathFormat& destination) {
           return task_group_.AddTask([this, next_batch, destination] {
             Future<> has_room = dataset_writer_->WriteRecordBatch(
-                next_batch, destination.directory, destination.prefix);
+                next_batch, destination.directory, destination.filename);
             if (!has_room.is_finished()) {
               // We don't have to worry about sequencing backpressure here since
               // task_group_ serves as our sequencer.  If batches continue to arrive after
@@ -481,11 +481,11 @@ class TeeNode : public compute::MapNode {
                         compute::Expression guarantee) {
     return WriteBatch(batch, guarantee, write_options_,
                       [this](std::shared_ptr<RecordBatch> next_batch,
-                             const Partitioning::PartitionPathFormat& destination) {
+                             const PartitionPathFormat& destination) {
                         return task_group_.AddTask([this, next_batch, destination] {
                           util::tracing::Span span;
                           Future<> has_room = dataset_writer_->WriteRecordBatch(
-                              next_batch, destination.directory, destination.prefix);
+                              next_batch, destination.directory, destination.filename);
                           if (!has_room.is_finished()) {
                             this->Pause();
                             return has_room.Then([this] { this->Resume(); });
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 9f914b57f0..f2a3903208 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -894,7 +894,7 @@ ParquetDatasetFactory::CollectParquetFragments(const Partitioning& partitioning)
     auto row_groups = Iota(metadata_subset->num_row_groups());
 
     auto partition_expression =
-        partitioning.Parse(StripPrefixAndFilename(path, options_.partition_base_dir))
+        partitioning.Parse(StripPrefix(path, options_.partition_base_dir))
             .ValueOr(compute::literal(true));
 
     ARROW_ASSIGN_OR_RAISE(
diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc
index 763ea0bc6c..6b4d601db0 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -263,7 +263,7 @@ Result<compute::Expression> KeyValuePartitioning::Parse(const std::string& path)
   return and_(std::move(expressions));
 }
 
-Result<Partitioning::PartitionPathFormat> KeyValuePartitioning::Format(
+Result<PartitionPathFormat> KeyValuePartitioning::Format(
     const compute::Expression& expr) const {
   ScalarVector values{static_cast<size_t>(schema_->num_fields()), nullptr};
 
@@ -377,7 +377,8 @@ DirectoryPartitioning::DirectoryPartitioning(std::shared_ptr<Schema> schema,
 
 Result<std::vector<KeyValuePartitioning::Key>> DirectoryPartitioning::ParseKeys(
     const std::string& path) const {
-  std::vector<std::string> segments = fs::internal::SplitAbstractPath(path);
+  std::vector<std::string> segments =
+      fs::internal::SplitAbstractPath(fs::internal::GetAbstractPathParent(path).first);
   return ParsePartitionSegments(segments);
 }
 
@@ -390,23 +391,24 @@ FilenamePartitioning::FilenamePartitioning(std::shared_ptr<Schema> schema,
 
 Result<std::vector<KeyValuePartitioning::Key>> FilenamePartitioning::ParseKeys(
     const std::string& path) const {
-  std::vector<std::string> segments =
-      fs::internal::SplitAbstractPath(StripNonPrefix(path), kFilenamePartitionSep);
+  std::vector<std::string> segments = fs::internal::SplitAbstractPath(
+      StripNonPrefix(fs::internal::GetAbstractPathParent(path).second),
+      kFilenamePartitionSep);
   return ParsePartitionSegments(segments);
 }
 
-Result<Partitioning::PartitionPathFormat> DirectoryPartitioning::FormatValues(
+Result<PartitionPathFormat> DirectoryPartitioning::FormatValues(
     const ScalarVector& values) const {
   std::vector<std::string> segments;
   ARROW_ASSIGN_OR_RAISE(segments, FormatPartitionSegments(values));
   return PartitionPathFormat{fs::internal::JoinAbstractPath(std::move(segments)), ""};
 }
 
-Result<Partitioning::PartitionPathFormat> FilenamePartitioning::FormatValues(
+Result<PartitionPathFormat> FilenamePartitioning::FormatValues(
     const ScalarVector& values) const {
   std::vector<std::string> segments;
   ARROW_ASSIGN_OR_RAISE(segments, FormatPartitionSegments(values));
-  return Partitioning::PartitionPathFormat{
+  return PartitionPathFormat{
       "", fs::internal::JoinAbstractPath(std::move(segments), kFilenamePartitionSep) +
               kFilenamePartitionSep};
 }
@@ -721,7 +723,8 @@ Result<std::vector<KeyValuePartitioning::Key>> HivePartitioning::ParseKeys(
     const std::string& path) const {
   std::vector<Key> keys;
 
-  for (const auto& segment : fs::internal::SplitAbstractPath(path)) {
+  for (const auto& segment :
+       fs::internal::SplitAbstractPath(fs::internal::GetAbstractPathParent(path).first)) {
     ARROW_ASSIGN_OR_RAISE(auto maybe_key, ParseKey(segment, hive_options_));
     if (auto key = maybe_key) {
       keys.push_back(std::move(*key));
@@ -731,7 +734,7 @@ Result<std::vector<KeyValuePartitioning::Key>> HivePartitioning::ParseKeys(
   return keys;
 }
 
-Result<Partitioning::PartitionPathFormat> HivePartitioning::FormatValues(
+Result<PartitionPathFormat> HivePartitioning::FormatValues(
     const ScalarVector& values) const {
   std::vector<std::string> segments(static_cast<size_t>(schema_->num_fields()));
 
@@ -749,8 +752,7 @@ Result<Partitioning::PartitionPathFormat> HivePartitioning::FormatValues(
     }
   }
 
-  return Partitioning::PartitionPathFormat{
-      fs::internal::JoinAbstractPath(std::move(segments)), ""};
+  return PartitionPathFormat{fs::internal::JoinAbstractPath(std::move(segments)), ""};
 }
 
 class HivePartitioningFactory : public KeyValuePartitioningFactory {
@@ -805,9 +807,14 @@ std::shared_ptr<PartitioningFactory> HivePartitioning::MakeFactory(
   return std::shared_ptr<PartitioningFactory>(new HivePartitioningFactory(options));
 }
 
-std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix) {
+std::string StripPrefix(const std::string& path, const std::string& prefix) {
   auto maybe_base_less = fs::internal::RemoveAncestor(prefix, path);
   auto base_less = maybe_base_less ? std::string(*maybe_base_less) : path;
+  return base_less;
+}
+
+std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix) {
+  auto base_less = StripPrefix(path, prefix);
   auto basename_filename = fs::internal::GetAbstractPathParent(base_less);
   return basename_filename.first;
 }
@@ -837,7 +844,6 @@ Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema(
   if (auto part = partitioning()) {
     return part->schema();
   }
-
   return factory()->Inspect(paths);
 }
 
diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h
index 6a679f3482..cd225f1378 100644
--- a/cpp/src/arrow/dataset/partition.h
+++ b/cpp/src/arrow/dataset/partition.h
@@ -38,6 +38,10 @@ namespace dataset {
 
 constexpr char kFilenamePartitionSep = '_';
 
+struct ARROW_DS_EXPORT PartitionPathFormat {
+  std::string directory, filename;
+};
+
 // ----------------------------------------------------------------------
 // Partitioning
 
@@ -78,10 +82,6 @@ class ARROW_DS_EXPORT Partitioning {
   /// \brief Parse a path into a partition expression
   virtual Result<compute::Expression> Parse(const std::string& path) const = 0;
 
-  struct PartitionPathFormat {
-    std::string directory, prefix;
-  };
-
   virtual Result<PartitionPathFormat> Format(const compute::Expression& expr) const = 0;
 
   /// \brief A default Partitioning which always yields scalar(true)
@@ -358,9 +358,13 @@ class ARROW_DS_EXPORT FilenamePartitioning : public KeyValuePartitioning {
   Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override;
 };
 
-/// \brief Remove a prefix and the filename of a path.
+ARROW_DS_EXPORT std::string StripPrefix(const std::string& path,
+                                        const std::string& prefix);
+
+/// \brief Extracts the directory and filename and removes the prefix of a path
 ///
-/// e.g., `StripPrefixAndFilename("/data/year=2019/c.txt", "/data") -> "year=2019"`
+/// e.g., `StripPrefixAndFilename("/data/year=2019/c.txt", "/data") ->
+/// {"year=2019","c.txt"}`
 ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path,
                                                    const std::string& prefix);
 
diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc
index 87f6f89ccd..86b8c4f0b9 100644
--- a/cpp/src/arrow/dataset/partition_test.cc
+++ b/cpp/src/arrow/dataset/partition_test.cc
@@ -58,18 +58,20 @@ class TestPartitioning : public ::testing::Test {
   }
 
   void AssertFormat(compute::Expression expr, const std::string& expected_directory,
-                    const std::string& expected_prefix = "") {
+                    const std::string& expected_filename = "") {
     // formatted partition expressions are bound to the schema of the dataset being
     // written
     ASSERT_OK_AND_ASSIGN(auto formatted, partitioning_->Format(expr));
     ASSERT_EQ(formatted.directory, expected_directory);
-    ASSERT_EQ(formatted.prefix, expected_prefix);
+    ASSERT_EQ(formatted.filename, expected_filename);
 
+    // if ((formatted.filename).empty()){
+    //   formatted.filename = "format.parquet";
+    // }
     // ensure the formatted path round trips the relevant components of the partition
     // expression: roundtripped should be a subset of expr
     ASSERT_OK_AND_ASSIGN(compute::Expression roundtripped,
-                         partitioning_->Parse(formatted.directory));
-
+                         partitioning_->Parse(formatted.directory + formatted.filename));
     ASSERT_OK_AND_ASSIGN(roundtripped, roundtripped.Bind(*written_schema_));
     ASSERT_OK_AND_ASSIGN(auto simplified, SimplifyWithGuarantee(roundtripped, expr));
     ASSERT_EQ(simplified, literal(true));
@@ -188,22 +190,38 @@ TEST_F(TestPartitioning, DirectoryPartitioning) {
   partitioning_ = std::make_shared<DirectoryPartitioning>(
       schema({field("alpha", int32()), field("beta", utf8())}));
 
-  AssertParse("/0/hello", and_(equal(field_ref("alpha"), literal(0)),
-                               equal(field_ref("beta"), literal("hello"))));
-  AssertParse("/3", equal(field_ref("alpha"), literal(3)));
-  AssertParseError("/world/0");    // reversed order
-  AssertParseError("/0.0/foo");    // invalid alpha
-  AssertParseError("/3.25");       // invalid alpha with missing beta
+  AssertParse("/0/hello/", and_(equal(field_ref("alpha"), literal(0)),
+                                equal(field_ref("beta"), literal("hello"))));
+  AssertParse("/3/", equal(field_ref("alpha"), literal(3)));
+  AssertParseError("/world/0/");   // reversed order
+  AssertParseError("/0.0/foo/");   // invalid alpha
+  AssertParseError("/3.25/");      // invalid alpha with missing beta
   AssertParse("", literal(true));  // no segments to parse
 
   // gotcha someday:
-  AssertParse("/0/dat.parquet", and_(equal(field_ref("alpha"), literal(0)),
-                                     equal(field_ref("beta"), literal("dat.parquet"))));
+  AssertParse("/0/dat.parquet/", and_(equal(field_ref("alpha"), literal(0)),
+                                      equal(field_ref("beta"), literal("dat.parquet"))));
 
   AssertParse("/0/foo/ignored=2341", and_(equal(field_ref("alpha"), literal(0)),
                                           equal(field_ref("beta"), literal("foo"))));
 }
 
+TEST_F(TestPartitioning, FilenamePartitioning) {
+  partitioning_ = std::make_shared<FilenamePartitioning>(
+      schema({field("alpha", int32()), field("beta", utf8())}));
+
+  AssertParse("0_hello_", and_(equal(field_ref("alpha"), literal(0)),
+                               equal(field_ref("beta"), literal("hello"))));
+  AssertParse("0_", equal(field_ref("alpha"), literal(0)));
+  AssertParseError("world_0_");    // reversed order
+  AssertParseError("0.0_foo_");    // invalid alpha
+  AssertParseError("3.25_");       // invalid alpha with missing beta
+  AssertParse("", literal(true));  // no segments to parse
+
+  AssertParse("0_foo_ignored=2341", and_(equal(field_ref("alpha"), literal(0)),
+                                         equal(field_ref("beta"), literal("foo"))));
+}
+
 TEST_F(TestPartitioning, DirectoryPartitioningFormat) {
   partitioning_ = std::make_shared<DirectoryPartitioning>(
       schema({field("alpha", int32()), field("beta", utf8())}));
@@ -267,7 +285,7 @@ TEST_F(TestPartitioning, DirectoryPartitioningWithTemporal) {
         schema({field("year", int32()), field("month", int8()), field("day", temporal)}));
 
     ASSERT_OK_AND_ASSIGN(auto day, StringScalar("2020-06-08").CastTo(temporal));
-    AssertParse("/2020/06/2020-06-08",
+    AssertParse("/2020/06/2020-06-08/",
                 and_({equal(field_ref("year"), literal(2020)),
                       equal(field_ref("month"), literal<int8_t>(6)),
                       equal(field_ref("day"), literal(day))}));
@@ -368,11 +386,11 @@ TEST_F(TestPartitioning, DictionaryHasUniqueValues) {
     auto dictionary_scalar =
         std::make_shared<DictionaryScalar>(index_and_dictionary, alpha->type());
 
-    auto path = "/" + expected_dictionary->GetString(i);
+    auto path = "/" + expected_dictionary->GetString(i) + "/";
     AssertParse(path, equal(field_ref("alpha"), literal(dictionary_scalar)));
   }
 
-  AssertParseError("/yosemite");  // not in inspected dictionary
+  AssertParseError("/yosemite/");  // not in inspected dictionary
 }
 
 TEST_F(TestPartitioning, DiscoverSchemaSegfault) {
@@ -385,27 +403,27 @@ TEST_F(TestPartitioning, HivePartitioning) {
   partitioning_ = std::make_shared<HivePartitioning>(
       schema({field("alpha", int32()), field("beta", float32())}), ArrayVector(), "xyz");
 
-  AssertParse("/alpha=0/beta=3.25", and_(equal(field_ref("alpha"), literal(0)),
-                                         equal(field_ref("beta"), literal(3.25f))));
-  AssertParse("/beta=3.25/alpha=0", and_(equal(field_ref("beta"), literal(3.25f)),
-                                         equal(field_ref("alpha"), literal(0))));
-  AssertParse("/alpha=0", equal(field_ref("alpha"), literal(0)));
-  AssertParse("/alpha=xyz/beta=3.25", and_(is_null(field_ref("alpha")),
-                                           equal(field_ref("beta"), literal(3.25f))));
-  AssertParse("/beta=3.25", equal(field_ref("beta"), literal(3.25f)));
+  AssertParse("/alpha=0/beta=3.25/", and_(equal(field_ref("alpha"), literal(0)),
+                                          equal(field_ref("beta"), literal(3.25f))));
+  AssertParse("/beta=3.25/alpha=0/", and_(equal(field_ref("beta"), literal(3.25f)),
+                                          equal(field_ref("alpha"), literal(0))));
+  AssertParse("/alpha=0/", equal(field_ref("alpha"), literal(0)));
+  AssertParse("/alpha=xyz/beta=3.25/", and_(is_null(field_ref("alpha")),
+                                            equal(field_ref("beta"), literal(3.25f))));
+  AssertParse("/beta=3.25/", equal(field_ref("beta"), literal(3.25f)));
   AssertParse("", literal(true));
 
-  AssertParse("/alpha=0/unexpected/beta=3.25",
+  AssertParse("/alpha=0/beta=3.25/ignored=2341/",
               and_(equal(field_ref("alpha"), literal(0)),
                    equal(field_ref("beta"), literal(3.25f))));
 
-  AssertParse("/alpha=0/beta=3.25/ignored=2341",
+  AssertParse("/alpha=0/beta=3.25/ignored=2341/",
               and_(equal(field_ref("alpha"), literal(0)),
                    equal(field_ref("beta"), literal(3.25f))));
 
-  AssertParse("/ignored=2341", literal(true));
+  AssertParse("/ignored=2341/", literal(true));
 
-  AssertParseError("/alpha=0.0/beta=3.25");  // conversion of "0.0" to int32 fails
+  AssertParseError("/alpha=0.0/beta=3.25/");  // conversion of "0.0" to int32 fails
 }
 
 TEST_F(TestPartitioning, HivePartitioningFormat) {
@@ -542,11 +560,11 @@ TEST_F(TestPartitioning, HiveDictionaryHasUniqueValues) {
     auto dictionary_scalar =
         std::make_shared<DictionaryScalar>(index_and_dictionary, alpha->type());
 
-    auto path = "/alpha=" + expected_dictionary->GetString(i);
+    auto path = "/alpha=" + expected_dictionary->GetString(i) + "/";
     AssertParse(path, equal(field_ref("alpha"), literal(dictionary_scalar)));
   }
 
-  AssertParseError("/alpha=yosemite");  // not in inspected dictionary
+  AssertParseError("/alpha=yosemite/");  // not in inspected dictionary
 }
 
 TEST_F(TestPartitioning, ExistingSchemaDirectory) {
@@ -647,7 +665,7 @@ TEST_F(TestPartitioning, UrlEncodedDirectory) {
   auto date = std::make_shared<TimestampScalar>(1620086400, ts);
   auto time = std::make_shared<TimestampScalar>(1620113220, ts);
   partitioning_ = std::make_shared<DirectoryPartitioning>(options.schema, ArrayVector());
-  AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24",
+  AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24/",
               and_({equal(field_ref("date"), literal(date)),
                     equal(field_ref("time"), literal(time)),
                     equal(field_ref("str"), literal("$"))}));
@@ -656,7 +674,7 @@ TEST_F(TestPartitioning, UrlEncodedDirectory) {
   EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"),
                                   factory_->Inspect({"/%AF/%BF/%CF"}));
   EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"),
-                                  partitioning_->Parse({"/%AF/%BF/%CF"}));
+                                  partitioning_->Parse("/%AF/%BF/%CF"));
 
   options.segment_encoding = SegmentEncoding::None;
   options.schema =
@@ -667,7 +685,7 @@ TEST_F(TestPartitioning, UrlEncodedDirectory) {
                 options.schema->fields());
   partitioning_ = std::make_shared<DirectoryPartitioning>(
       options.schema, ArrayVector(), options.AsPartitioningOptions());
-  AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24",
+  AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24/",
               and_({equal(field_ref("date"), literal("2021-05-04 00%3A00%3A00")),
                     equal(field_ref("time"), literal("2021-05-04 07%3A27%3A00")),
                     equal(field_ref("str"), literal("%24"))}));
@@ -690,15 +708,16 @@ TEST_F(TestPartitioning, UrlEncodedHive) {
   auto time = std::make_shared<TimestampScalar>(1620113220, ts);
   partitioning_ = std::make_shared<HivePartitioning>(options.schema, ArrayVector(),
                                                      options.AsHivePartitioningOptions());
-  AssertParse("/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=$",
+  AssertParse("/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=$/",
               and_({equal(field_ref("date"), literal(date)),
                     equal(field_ref("time"), literal(time)), is_null(field_ref("str"))}));
-  AssertParse("/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE",
-              and_({equal(field_ref("date"), literal(date)),
-                    equal(field_ref("time"), literal(time)),
-                    equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))}));
+  AssertParse(
+      "/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE/",
+      and_({equal(field_ref("date"), literal(date)),
+            equal(field_ref("time"), literal(time)),
+            equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))}));
   // URL-encoded null fallback value
-  AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24",
+  AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24/",
               and_({equal(field_ref("date"), literal(date)),
                     equal(field_ref("time"), literal(time)), is_null(field_ref("str"))}));
 
@@ -706,7 +725,7 @@ TEST_F(TestPartitioning, UrlEncodedHive) {
   EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"),
                                   factory_->Inspect({"/date=%AF/time=%BF/str=%CF"}));
   EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"),
-                                  partitioning_->Parse({"/date=%AF/time=%BF/str=%CF"}));
+                                  partitioning_->Parse("/date=%AF/time=%BF/str=%CF"));
 
   options.segment_encoding = SegmentEncoding::None;
   options.schema =
@@ -719,7 +738,7 @@ TEST_F(TestPartitioning, UrlEncodedHive) {
       options.schema->fields());
   partitioning_ = std::make_shared<HivePartitioning>(options.schema, ArrayVector(),
                                                      options.AsHivePartitioningOptions());
-  AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24",
+  AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24/",
               and_({equal(field_ref("date"), literal("2021-05-04 00%3A00%3A00")),
                     equal(field_ref("time"), literal("2021-05-04 07%3A27%3A00")),
                     equal(field_ref("str"), literal("%24"))}));
@@ -727,9 +746,8 @@ TEST_F(TestPartitioning, UrlEncodedHive) {
   // Invalid UTF-8
   EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"),
                                   factory_->Inspect({"/date=\xAF/time=\xBF/str=\xCF"}));
-  EXPECT_RAISES_WITH_MESSAGE_THAT(
-      Invalid, ::testing::HasSubstr("was not valid UTF-8"),
-      partitioning_->Parse({"/date=\xAF/time=\xBF/str=\xCF"}));
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"),
+                                  partitioning_->Parse("/date=\xAF/time=\xBF/str=\xCF"));
 }
 
 TEST_F(TestPartitioning, UrlEncodedHiveWithKeyEncoded) {
@@ -754,19 +772,19 @@ TEST_F(TestPartitioning, UrlEncodedHiveWithKeyEncoded) {
                                                      options.AsHivePartitioningOptions());
   AssertParse(
       "/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
-      "07:27:00/str=$",
+      "07:27:00/str=$/",
       and_({equal(field_ref("test'; date"), literal(date)),
             equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))}));
   AssertParse(
       "/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
-      "07:27:00/str=%E3%81%8F%E3%81%BE",
+      "07:27:00/str=%E3%81%8F%E3%81%BE/",
       and_({equal(field_ref("test'; date"), literal(date)),
             equal(field_ref("test'; time"), literal(time)),
             equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))}));
   // URL-encoded null fallback value
   AssertParse(
       "/test%27%3B%20date=2021-05-04 00%3A00%3A00/test%27%3B%20time=2021-05-04 "
-      "07%3A27%3A00/str=%24",
+      "07%3A27%3A00/str=%24/",
       and_({equal(field_ref("test'; date"), literal(date)),
             equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))}));
 
@@ -776,7 +794,7 @@ TEST_F(TestPartitioning, UrlEncodedHiveWithKeyEncoded) {
       factory_->Inspect({"/%AF=2021-05-04/time=2021-05-04 07%3A27%3A00/str=%24"}));
   EXPECT_RAISES_WITH_MESSAGE_THAT(
       Invalid, ::testing::HasSubstr("was not valid UTF-8"),
-      partitioning_->Parse({"/%AF=2021-05-04/%BF=2021-05-04 07%3A27%3A00/str=%24"}));
+      partitioning_->Parse("/%AF=2021-05-04/%BF=2021-05-04 07%3A27%3A00/str=%24/"));
 }
 
 TEST_F(TestPartitioning, EtlThenHive) {
@@ -801,17 +819,17 @@ TEST_F(TestPartitioning, EtlThenHive) {
         auto etl_segments_end = segments.begin() + etl_fields.size();
         auto etl_path =
             fs::internal::JoinAbstractPath(segments.begin(), etl_segments_end);
-        ARROW_ASSIGN_OR_RAISE(auto etl_expr, etl_part.Parse(etl_path));
+        ARROW_ASSIGN_OR_RAISE(auto etl_expr, etl_part.Parse(etl_path + "/"));
 
         auto alphabeta_segments_end = etl_segments_end + alphabeta_fields.size();
         auto alphabeta_path =
             fs::internal::JoinAbstractPath(etl_segments_end, alphabeta_segments_end);
-        ARROW_ASSIGN_OR_RAISE(auto alphabeta_expr, alphabeta_part.Parse(alphabeta_path));
-
+        ARROW_ASSIGN_OR_RAISE(auto alphabeta_expr,
+                              alphabeta_part.Parse(alphabeta_path + "/"));
         return and_(etl_expr, alphabeta_expr);
       });
 
-  AssertParse("/1999/12/31/00/alpha=0/beta=3.25",
+  AssertParse("/1999/12/31/00/alpha=0/beta=3.25/",
               and_({equal(field_ref("year"), literal<int16_t>(1999)),
                     equal(field_ref("month"), literal<int8_t>(12)),
                     equal(field_ref("day"), literal<int8_t>(31)),
@@ -819,7 +837,7 @@ TEST_F(TestPartitioning, EtlThenHive) {
                     and_(equal(field_ref("alpha"), literal<int32_t>(0)),
                          equal(field_ref("beta"), literal<float>(3.25f)))}));
 
-  AssertParseError("/20X6/03/21/05/alpha=0/beta=3.25");
+  AssertParseError("/20X6/03/21/05/alpha=0/beta=3.25/");
 }
 
 TEST_F(TestPartitioning, Set) {
@@ -919,9 +937,8 @@ class RangePartitioning : public Partitioning {
     return Status::OK();
   }
 
-  Result<Partitioning::PartitionPathFormat> Format(
-      const compute::Expression&) const override {
-    return Partitioning::PartitionPathFormat{"", ""};
+  Result<PartitionPathFormat> Format(const compute::Expression&) const override {
+    return PartitionPathFormat{"", ""};
   }
   Result<PartitionedBatches> Partition(
       const std::shared_ptr<RecordBatch>&) const override {
@@ -953,9 +970,9 @@ TEST(TestStripPrefixAndFilename, Basic) {
   std::vector<std::string> input{"/data/year=2019/file.parquet",
                                  "/data/year=2019/month=12/file.parquet",
                                  "/data/year=2019/month=12/day=01/file.parquet"};
-  EXPECT_THAT(StripPrefixAndFilename(input, "/data"),
-              testing::ElementsAre("year=2019", "year=2019/month=12",
-                                   "year=2019/month=12/day=01"));
+  auto paths = StripPrefixAndFilename(input, "/data");
+  EXPECT_THAT(paths, testing::ElementsAre("year=2019", "year=2019/month=12",
+                                          "year=2019/month=12/day=01"));
 }
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc
index 83ff069ecf..04484dd5bd 100644
--- a/cpp/src/arrow/filesystem/path_util.cc
+++ b/cpp/src/arrow/filesystem/path_util.cc
@@ -34,12 +34,12 @@ namespace internal {
 std::vector<std::string> SplitAbstractPath(const std::string& path, char sep) {
   std::vector<std::string> parts;
   auto v = util::string_view(path);
-  // Strip trailing slash
-  if (v.length() > 0 && v.back() == kSep) {
+  // Strip trailing separator
+  if (v.length() > 0 && v.back() == sep) {
     v = v.substr(0, v.length() - 1);
   }
-  // Strip leading slash
-  if (v.length() > 0 && v.front() == kSep) {
+  // Strip leading separator
+  if (v.length() > 0 && v.front() == sep) {
     v = v.substr(1);
   }
   if (v.length() == 0) {
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 2933b12ea3..6c1b8db5a6 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -220,7 +220,6 @@ def partitioning(schema=None, field_names=None, flavor=None,
                 "For the default directory flavor, need to specify "
                 "a Schema or a list of field names")
     if flavor == "filename":
-        # default flavor
         if schema is not None:
             if field_names is not None:
                 raise ValueError(
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 3bc905f2f9..288301fd4b 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -582,7 +582,7 @@ def test_partitioning():
     )
     assert len(partitioning.dictionaries) == 2
     assert all(x is None for x in partitioning.dictionaries)
-    expr = partitioning.parse('/3/3.14')
+    expr = partitioning.parse('/3/3.14/')
     assert isinstance(expr, ds.Expression)
 
     expected = (ds.field('group') == 3) & (ds.field('key') == 3.14)
@@ -591,7 +591,7 @@ def test_partitioning():
     with pytest.raises(pa.ArrowInvalid):
         partitioning.parse('/prefix/3/aaa')
 
-    expr = partitioning.parse('/3')
+    expr = partitioning.parse('/3/')
     expected = ds.field('group') == 3
     assert expr.equals(expected)
 
@@ -604,20 +604,20 @@ def test_partitioning():
     )
     assert len(partitioning.dictionaries) == 2
     assert all(x is None for x in partitioning.dictionaries)
-    expr = partitioning.parse('/alpha=0/beta=3')
+    expr = partitioning.parse('/alpha=0/beta=3/')
     expected = (
         (ds.field('alpha') == ds.scalar(0)) &
         (ds.field('beta') == ds.scalar(3))
     )
     assert expr.equals(expected)
 
-    expr = partitioning.parse('/alpha=xyz/beta=3')
+    expr = partitioning.parse('/alpha=xyz/beta=3/')
     expected = (
         (ds.field('alpha').is_null() & (ds.field('beta') == ds.scalar(3)))
     )
     assert expr.equals(expected)
 
-    for shouldfail in ['/alpha=one/beta=2', '/alpha=one', '/beta=two']:
+    for shouldfail in ['/alpha=one/beta=2/', '/alpha=one/', '/beta=two/']:
         with pytest.raises(pa.ArrowInvalid):
             partitioning.parse(shouldfail)
 
@@ -662,6 +662,24 @@ def test_partitioning():
     assert partitioning.dictionaries[1].to_pylist() == [
         "first", "second", "third"]
 
+    # test partitioning roundtrip
+    table = pa.table([
+        pa.array(range(20)), pa.array(np.random.randn(20)),
+        pa.array(np.repeat(['a', 'b'], 10))],
+        names=["f1", "f2", "part"]
+    )
+    partitioning_schema = pa.schema([("part", pa.string())])
+    for klass in [ds.DirectoryPartitioning, ds.HivePartitioning,
+                  ds.FilenamePartitioning]:
+        with tempfile.TemporaryDirectory() as tempdir:
+            partitioning = klass(partitioning_schema)
+            ds.write_dataset(table, tempdir,
+                             format='ipc', partitioning=partitioning)
+            load_back = ds.dataset(tempdir, format='ipc',
+                                   partitioning=partitioning)
+            load_back_table = load_back.to_table()
+            assert load_back_table.equals(table)
+
 
 def test_expression_arithmetic_operators():
     dataset = ds.dataset(pa.table({'a': [1, 2, 3], 'b': [2, 2, 2]}))