You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2021/01/15 16:44:36 UTC

[arrow] branch master updated: ARROW-10247: [C++][Dataset] Support writing datasets partitioned on dictionary columns

This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new eaa7b7a  ARROW-10247: [C++][Dataset] Support writing datasets partitioned on dictionary columns
eaa7b7a is described below

commit eaa7b7af79fc543c2603a8ef37553061097dab87
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Fri Jan 15 11:43:50 2021 -0500

    ARROW-10247: [C++][Dataset] Support writing datasets partitioned on dictionary columns
    
    Enables usage of dictionary columns as partition columns on write.
    
    Additionally resolves some partition-related follow ups from #8894 (@pitrou):
    - raise an error status [instead of aborting](https://github.com/apache/arrow/pull/8894/#discussion_r545219042) for overflowing maximum group count
    - handle dictionary index types [other than int32](https://github.com/apache/arrow/pull/8894/#discussion_r545215901)
    - don't build an unused null bitmap [in CountsToOffsets](https://github.com/apache/arrow/pull/8894/#discussion_r545212237)
    - improve docstrings for [MakeGroupings, ApplyGroupings](https://github.com/apache/arrow/pull/8894/#discussion_r545209568)
    
    At some point, we'll probably want to support null grouping criteria. (For now, this PR adds a test asserting that nulls in any grouping column raise an error.) This will require adding an option/overload/... of dictionary_encode which places nulls in the dictionary instead of the indices, and ensuring Partitionings can format nulls appropriately. This would allow users to write a partitioned dataset which preserves nulls sensibly:
    
    ```
    data/
        col=a/
            part-0.parquet # col is "a" throughout
        col=b/
            part-1.parquet # col is "b" throughout
        part-2.parquet # col is null throughout
    ```
    
    Closes #9130 from bkietz/10247-Cannot-write-dataset-with
    
    Lead-authored-by: Benjamin Kietzman <be...@gmail.com>
    Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
    Signed-off-by: Benjamin Kietzman <be...@gmail.com>
---
 cpp/src/arrow/dataset/expression.cc          |  57 +++++++-------
 cpp/src/arrow/dataset/file_base.cc           |  12 ++-
 cpp/src/arrow/dataset/file_base.h            |   3 +
 cpp/src/arrow/dataset/file_ipc_test.cc       |  12 +++
 cpp/src/arrow/dataset/partition.cc           | 109 ++++++++++++++++++---------
 cpp/src/arrow/dataset/partition.h            |  56 ++++++++++++--
 cpp/src/arrow/dataset/partition_test.cc      |  61 +++++++++++++++
 python/pyarrow/_dataset.pyx                  |  57 +++++++++++---
 python/pyarrow/dataset.py                    |  21 +++++-
 python/pyarrow/includes/libarrow_dataset.pxd |   7 +-
 python/pyarrow/tests/test_dataset.py         |  50 ++++++++++++
 testing                                      |   2 +-
 12 files changed, 356 insertions(+), 91 deletions(-)

diff --git a/cpp/src/arrow/dataset/expression.cc b/cpp/src/arrow/dataset/expression.cc
index 296fb22..e7acac3 100644
--- a/cpp/src/arrow/dataset/expression.cc
+++ b/cpp/src/arrow/dataset/expression.cc
@@ -90,27 +90,35 @@ ValueDescr Expression::descr() const {
   return CallNotNull(*this)->descr;
 }
 
+namespace {
+
+std::string PrintDatum(const Datum& datum) {
+  if (datum.is_scalar()) {
+    switch (datum.type()->id()) {
+      case Type::STRING:
+      case Type::LARGE_STRING:
+        return '"' +
+               Escape(util::string_view(*datum.scalar_as<BaseBinaryScalar>().value)) +
+               '"';
+
+      case Type::BINARY:
+      case Type::FIXED_SIZE_BINARY:
+      case Type::LARGE_BINARY:
+        return '"' + datum.scalar_as<BaseBinaryScalar>().value->ToHexString() + '"';
+
+      default:
+        break;
+    }
+    return datum.scalar()->ToString();
+  }
+  return datum.ToString();
+}
+
+}  // namespace
+
 std::string Expression::ToString() const {
   if (auto lit = literal()) {
-    if (lit->is_scalar()) {
-      switch (lit->type()->id()) {
-        case Type::STRING:
-        case Type::LARGE_STRING:
-          return '"' +
-                 Escape(util::string_view(*lit->scalar_as<BaseBinaryScalar>().value)) +
-                 '"';
-
-        case Type::BINARY:
-        case Type::FIXED_SIZE_BINARY:
-        case Type::LARGE_BINARY:
-          return '"' + lit->scalar_as<BaseBinaryScalar>().value->ToHexString() + '"';
-
-        default:
-          break;
-      }
-      return lit->scalar()->ToString();
-    }
-    return lit->ToString();
+    return PrintDatum(*lit);
   }
 
   if (auto ref = field_ref()) {
@@ -763,16 +771,7 @@ Status ExtractKnownFieldValuesImpl(
     auto ref = call->arguments[0].field_ref();
     auto lit = call->arguments[1].literal();
 
-    auto it_success = known_values->emplace(*ref, *lit);
-    if (it_success.second) continue;
-
-    // A value was already known for ref; check it
-    auto ref_lit = it_success.first;
-    if (*lit != ref_lit->second) {
-      return Status::Invalid("Conflicting guarantees: (", ref->ToString(),
-                             " == ", lit->ToString(), ") vs (", ref->ToString(),
-                             " == ", ref_lit->second.ToString());
-    }
+    known_values->emplace(*ref, *lit);
   }
 
   conjunction_members->erase(unconsumed_end, conjunction_members->end());
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 2c437ce..612c249 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -313,12 +313,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
         ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch));
         batch.reset();  // drop to hopefully conserve memory
 
+        if (groups.batches.size() > static_cast<size_t>(write_options.max_partitions)) {
+          return Status::Invalid("Fragment would be written into ", groups.batches.size(),
+                                 " partitions. This exceeds the maximum of ",
+                                 write_options.max_partitions);
+        }
+
         std::unordered_set<WriteQueue*> need_flushed;
         for (size_t i = 0; i < groups.batches.size(); ++i) {
-          ARROW_ASSIGN_OR_RAISE(
-              auto partition_expression,
-              and_(std::move(groups.expressions[i]), fragment->partition_expression())
-                  .Bind(*scanner->schema()));
+          auto partition_expression =
+              and_(std::move(groups.expressions[i]), fragment->partition_expression());
           auto batch = std::move(groups.batches[i]);
 
           ARROW_ASSIGN_OR_RAISE(auto part,
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index bb2aa86..708f7e0 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -295,6 +295,9 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
   /// Partitioning used to generate fragment paths.
   std::shared_ptr<Partitioning> partitioning;
 
+  /// Maximum number of partitions any batch may be written into, default is 1K.
+  int max_partitions = 1024;
+
   /// Template string used to generate fragment basenames.
   /// {i} will be replaced by an auto incremented integer.
   std::string basename_template;
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc
index 4242113..b8428e0 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -223,6 +223,18 @@ TEST_F(TestIpcFileSystemDataset, WriteWithEmptyPartitioningSchema) {
   TestWriteWithEmptyPartitioningSchema();
 }
 
+TEST_F(TestIpcFileSystemDataset, WriteExceedsMaxPartitions) {
+  write_options_.partitioning = std::make_shared<DirectoryPartitioning>(
+      SchemaFromColumnNames(source_schema_, {"model"}));
+
+  // require that no batch be grouped into more than 2 written batches:
+  write_options_.max_partitions = 2;
+
+  auto scanner = std::make_shared<Scanner>(dataset_, scan_options_, scan_context_);
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("This exceeds the maximum"),
+                                  FileSystemDataset::Write(write_options_, scanner));
+}
+
 TEST_F(TestIpcFileFormat, OpenFailureWithRelevantError) {
   std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
   auto result = format_->Inspect(FileSource(buf));
diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc
index 3a164d8..d6a3723 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "arrow/array/array_base.h"
+#include "arrow/array/array_dict.h"
 #include "arrow/array/array_nested.h"
 #include "arrow/array/builder_dict.h"
 #include "arrow/compute/api_scalar.h"
@@ -191,7 +192,7 @@ Result<Expression> KeyValuePartitioning::Parse(const std::string& path) const {
 }
 
 Result<std::string> KeyValuePartitioning::Format(const Expression& expr) const {
-  std::vector<Scalar*> values{static_cast<size_t>(schema_->num_fields()), nullptr};
+  ScalarVector values{static_cast<size_t>(schema_->num_fields()), nullptr};
 
   ARROW_ASSIGN_OR_RAISE(auto known_values, ExtractKnownFieldValues(expr));
   for (const auto& ref_value : known_values) {
@@ -202,7 +203,7 @@ Result<std::string> KeyValuePartitioning::Format(const Expression& expr) const {
     ARROW_ASSIGN_OR_RAISE(auto match, ref_value.first.FindOneOrNone(*schema_));
     if (match.empty()) continue;
 
-    const auto& value = ref_value.second.scalar();
+    auto value = ref_value.second.scalar();
 
     const auto& field = schema_->field(match[0]);
     if (!value->type->Equals(field->type())) {
@@ -210,7 +211,12 @@ Result<std::string> KeyValuePartitioning::Format(const Expression& expr) const {
                                ") is invalid for ", field->ToString());
     }
 
-    values[match[0]] = value.get();
+    if (value->type->id() == Type::DICTIONARY) {
+      ARROW_ASSIGN_OR_RAISE(
+          value, checked_cast<const DictionaryScalar&>(*value).GetEncodedValue());
+    }
+
+    values[match[0]] = std::move(value);
   }
 
   return FormatValues(values);
@@ -230,9 +236,9 @@ std::vector<KeyValuePartitioning::Key> DirectoryPartitioning::ParseKeys(
   return keys;
 }
 
-inline util::optional<int> NextValid(const std::vector<Scalar*>& values, int first_null) {
+inline util::optional<int> NextValid(const ScalarVector& values, int first_null) {
   auto it = std::find_if(values.begin() + first_null + 1, values.end(),
-                         [](Scalar* v) { return v != nullptr; });
+                         [](const std::shared_ptr<Scalar>& v) { return v != nullptr; });
 
   if (it == values.end()) {
     return util::nullopt;
@@ -242,7 +248,7 @@ inline util::optional<int> NextValid(const std::vector<Scalar*>& values, int fir
 }
 
 Result<std::string> DirectoryPartitioning::FormatValues(
-    const std::vector<Scalar*>& values) const {
+    const ScalarVector& values) const {
   std::vector<std::string> segments(static_cast<size_t>(schema_->num_fields()));
 
   for (int i = 0; i < schema_->num_fields(); ++i) {
@@ -426,8 +432,7 @@ std::vector<KeyValuePartitioning::Key> HivePartitioning::ParseKeys(
   return keys;
 }
 
-Result<std::string> HivePartitioning::FormatValues(
-    const std::vector<Scalar*>& values) const {
+Result<std::string> HivePartitioning::FormatValues(const ScalarVector& values) const {
   std::vector<std::string> segments(static_cast<size_t>(schema_->num_fields()));
 
   for (int i = 0; i < schema_->num_fields(); ++i) {
@@ -532,19 +537,21 @@ Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema(
 
 // Transform an array of counts to offsets which will divide a ListArray
 // into an equal number of slices with corresponding lengths.
-inline Result<std::shared_ptr<Array>> CountsToOffsets(
+inline Result<std::shared_ptr<Buffer>> CountsToOffsets(
     std::shared_ptr<Int64Array> counts) {
-  Int32Builder offset_builder;
+  TypedBufferBuilder<int32_t> offset_builder;
   RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1));
-  offset_builder.UnsafeAppend(0);
+
+  int32_t current_offset = 0;
+  offset_builder.UnsafeAppend(current_offset);
 
   for (int64_t i = 0; i < counts->length(); ++i) {
     DCHECK_NE(counts->Value(i), 0);
-    auto next_offset = static_cast<int32_t>(offset_builder[i] + counts->Value(i));
-    offset_builder.UnsafeAppend(next_offset);
+    current_offset += static_cast<int32_t>(counts->Value(i));
+    offset_builder.UnsafeAppend(current_offset);
   }
 
-  std::shared_ptr<Array> offsets;
+  std::shared_ptr<Buffer> offsets;
   RETURN_NOT_OK(offset_builder.Finish(&offsets));
   return offsets;
 }
@@ -604,6 +611,12 @@ class StructDictionary {
       RETURN_NOT_OK(builders[i].FinishInternal(&indices));
 
       ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], indices));
+
+      if (fields[i]->type()->id() == Type::DICTIONARY) {
+        RETURN_NOT_OK(RestoreDictionaryEncoding(
+            checked_pointer_cast<DictionaryType>(fields[i]->type()), &column));
+      }
+
       columns[i] = column.make_array();
     }
 
@@ -612,27 +625,22 @@ class StructDictionary {
 
  private:
   Status AddOne(Datum column, std::shared_ptr<Int32Array>* fused_indices) {
-    ArrayData* encoded;
     if (column.type()->id() != Type::DICTIONARY) {
-      ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(column));
+      ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(std::move(column)));
     }
-    encoded = column.mutable_array();
-
-    auto indices =
-        std::make_shared<Int32Array>(encoded->length, std::move(encoded->buffers[1]));
 
-    dictionaries_.push_back(MakeArray(std::move(encoded->dictionary)));
-    auto dictionary_size = static_cast<int32_t>(dictionaries_.back()->length());
+    auto dict_column = column.array_as<DictionaryArray>();
+    dictionaries_.push_back(dict_column->dictionary());
+    ARROW_ASSIGN_OR_RAISE(auto indices, compute::Cast(*dict_column->indices(), int32()));
 
     if (*fused_indices == nullptr) {
-      *fused_indices = std::move(indices);
-      size_ = dictionary_size;
-      return Status::OK();
+      *fused_indices = checked_pointer_cast<Int32Array>(std::move(indices));
+      return IncreaseSize();
     }
 
     // It's useful to think about the case where each of dictionaries_ has size 10.
     // In this case the decimal digit in the ones place is the code in dictionaries_[0],
-    // the tens place corresponds to dictionaries_[1], etc.
+    // the tens place corresponds to the code in dictionaries_[1], etc.
     // The incumbent indices must be shifted to the hundreds place so as not to collide.
     ARROW_ASSIGN_OR_RAISE(Datum new_fused_indices,
                           compute::Multiply(indices, MakeScalar(size_)));
@@ -641,10 +649,7 @@ class StructDictionary {
                           compute::Add(new_fused_indices, *fused_indices));
 
     *fused_indices = checked_pointer_cast<Int32Array>(new_fused_indices.make_array());
-
-    // XXX should probably cap this at 2**15 or so
-    ARROW_CHECK(!internal::MultiplyWithOverflow(size_, dictionary_size, &size_));
-    return Status::OK();
+    return IncreaseSize();
   }
 
   // expand a fused code into component dict codes, order is in order of addition
@@ -656,13 +661,48 @@ class StructDictionary {
     }
   }
 
-  int32_t size_;
+  Status RestoreDictionaryEncoding(std::shared_ptr<DictionaryType> expected_type,
+                                   Datum* column) {
+    DCHECK_NE(column->type()->id(), Type::DICTIONARY);
+    ARROW_ASSIGN_OR_RAISE(*column, compute::DictionaryEncode(std::move(*column)));
+
+    if (expected_type->index_type()->id() == Type::INT32) {
+      // dictionary_encode has already yielded the expected index_type
+      return Status::OK();
+    }
+
+    // cast the indices to the expected index type
+    auto dictionary = std::move(column->mutable_array()->dictionary);
+    column->mutable_array()->type = int32();
+
+    ARROW_ASSIGN_OR_RAISE(*column,
+                          compute::Cast(std::move(*column), expected_type->index_type()));
+
+    column->mutable_array()->dictionary = std::move(dictionary);
+    column->mutable_array()->type = expected_type;
+    return Status::OK();
+  }
+
+  Status IncreaseSize() {
+    auto factor = static_cast<int32_t>(dictionaries_.back()->length());
+
+    if (internal::MultiplyWithOverflow(size_, factor, &size_)) {
+      return Status::CapacityError("Max groups exceeded");
+    }
+    return Status::OK();
+  }
+
+  int32_t size_ = 1;
   ArrayVector dictionaries_;
 };
 
 Result<std::shared_ptr<StructArray>> MakeGroupings(const StructArray& by) {
   if (by.num_fields() == 0) {
-    return Status::NotImplemented("Grouping with no criteria");
+    return Status::Invalid("Grouping with no criteria");
+  }
+
+  if (by.null_count() != 0) {
+    return Status::Invalid("Grouping with null criteria");
   }
 
   ARROW_ASSIGN_OR_RAISE(auto fused, StructDictionary::Encode(by.fields()));
@@ -685,8 +725,9 @@ Result<std::shared_ptr<StructArray>> MakeGroupings(const StructArray& by) {
       checked_pointer_cast<Int64Array>(fused_counts_and_values->GetFieldByName("counts"));
   ARROW_ASSIGN_OR_RAISE(auto offsets, CountsToOffsets(std::move(counts)));
 
-  ARROW_ASSIGN_OR_RAISE(auto grouped_sort_indices,
-                        ListArray::FromArrays(*offsets, *sort_indices));
+  auto grouped_sort_indices =
+      std::make_shared<ListArray>(list(sort_indices->type()), unique_rows->length(),
+                                  std::move(offsets), std::move(sort_indices));
 
   return StructArray::Make(
       ArrayVector{std::move(unique_rows), std::move(grouped_sort_indices)},
diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h
index 8975f56..944434e 100644
--- a/cpp/src/arrow/dataset/partition.h
+++ b/cpp/src/arrow/dataset/partition.h
@@ -142,7 +142,7 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning {
 
   virtual std::vector<Key> ParseKeys(const std::string& path) const = 0;
 
-  virtual Result<std::string> FormatValues(const std::vector<Scalar*>& values) const = 0;
+  virtual Result<std::string> FormatValues(const ScalarVector& values) const = 0;
 
   /// Convert a Key to a full expression.
   Result<Expression> ConvertKey(const Key& key) const;
@@ -172,7 +172,7 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning {
  private:
   std::vector<Key> ParseKeys(const std::string& path) const override;
 
-  Result<std::string> FormatValues(const std::vector<Scalar*>& values) const override;
+  Result<std::string> FormatValues(const ScalarVector& values) const override;
 };
 
 /// \brief Multi-level, directory based partitioning
@@ -201,7 +201,7 @@ class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning {
  private:
   std::vector<Key> ParseKeys(const std::string& path) const override;
 
-  Result<std::string> FormatValues(const std::vector<Scalar*>& values) const override;
+  Result<std::string> FormatValues(const ScalarVector& values) const override;
 };
 
 /// \brief Implementation provided by lambda or other callable
@@ -288,16 +288,58 @@ class ARROW_DS_EXPORT PartitioningOrFactory {
 /// \brief Assemble lists of indices of identical rows.
 ///
 /// \param[in] by A StructArray whose columns will be used as grouping criteria.
-/// \return A StructArray mapping unique rows (in field "values", represented as a
-///         StructArray with the same fields as `by`) to lists of indices where
-///         that row appears (in field "groupings").
+///               Top level nulls are invalid, as are empty criteria (no grouping
+///               columns).
+/// \return A array of type `struct<values: by.type, groupings: list<int64>>`,
+///         which is a mapping from unique rows (field "values") to lists of
+///         indices into `by` where that row appears (field "groupings").
+///
+/// For example,
+///   MakeGroupings([
+///       {"a": "ex",  "b": 0},
+///       {"a": "ex",  "b": 0},
+///       {"a": "why", "b": 0},
+///       {"a": "why", "b": 0},
+///       {"a": "ex",  "b": 0},
+///       {"a": "why", "b": 1}
+///   ]) == [
+///       {"values": {"a": "ex",  "b": 0}, "groupings": [0, 1, 4]},
+///       {"values": {"a": "why", "b": 0}, "groupings": [2, 3]},
+///       {"values": {"a": "why", "b": 1}, "groupings": [5]}
+///   ]
 ARROW_DS_EXPORT
 Result<std::shared_ptr<StructArray>> MakeGroupings(const StructArray& by);
 
-/// \brief Produce slices of an Array which correspond to the provided groupings.
+/// \brief Produce a ListArray whose slots are selections of `array` which correspond to
+/// the provided groupings.
+///
+/// For example,
+///   ApplyGroupings([[0, 1, 4], [2, 3], [5]], [
+///       {"a": "ex",  "b": 0, "passenger": 0},
+///       {"a": "ex",  "b": 0, "passenger": 1},
+///       {"a": "why", "b": 0, "passenger": 2},
+///       {"a": "why", "b": 0, "passenger": 3},
+///       {"a": "ex",  "b": 0, "passenger": 4},
+///       {"a": "why", "b": 1, "passenger": 5}
+///   ]) == [
+///     [
+///       {"a": "ex",  "b": 0, "passenger": 0},
+///       {"a": "ex",  "b": 0, "passenger": 1},
+///       {"a": "ex",  "b": 0, "passenger": 4},
+///     ],
+///     [
+///       {"a": "why", "b": 0, "passenger": 2},
+///       {"a": "why", "b": 0, "passenger": 3},
+///     ],
+///     [
+///       {"a": "why", "b": 1, "passenger": 5}
+///     ]
+///   ]
 ARROW_DS_EXPORT
 Result<std::shared_ptr<ListArray>> ApplyGroupings(const ListArray& groupings,
                                                   const Array& array);
+
+/// \brief Produce selections of a RecordBatch which correspond to the provided groupings.
 ARROW_DS_EXPORT
 Result<RecordBatchVector> ApplyGroupings(const ListArray& groupings,
                                          const std::shared_ptr<RecordBatch>& batch);
diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc
index 2260eb2..286848d 100644
--- a/cpp/src/arrow/dataset/partition_test.cc
+++ b/cpp/src/arrow/dataset/partition_test.cc
@@ -32,6 +32,7 @@
 #include "arrow/filesystem/path_util.h"
 #include "arrow/status.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/util/range.h"
 
 namespace arrow {
 using internal::checked_pointer_cast;
@@ -152,6 +153,16 @@ TEST_F(TestPartitioning, DirectoryPartitioningFormat) {
            equal(field_ref("beta"), literal("hello"))));
 }
 
+TEST_F(TestPartitioning, DirectoryPartitioningFormatDictionary) {
+  auto dictionary = ArrayFromJSON(utf8(), R"(["hello", "world"])");
+  partitioning_ = std::make_shared<DirectoryPartitioning>(schema({DictStr("alpha")}),
+                                                          ArrayVector{dictionary});
+  written_schema_ = partitioning_->schema();
+
+  ASSERT_OK_AND_ASSIGN(auto dict_hello, MakeScalar("hello")->CastTo(DictStr("")->type()));
+  AssertFormat(equal(field_ref("alpha"), literal(dict_hello)), "hello");
+}
+
 TEST_F(TestPartitioning, DirectoryPartitioningWithTemporal) {
   for (auto temporal : {timestamp(TimeUnit::SECOND), date32()}) {
     partitioning_ = std::make_shared<DirectoryPartitioning>(
@@ -549,12 +560,14 @@ void AssertGrouping(const FieldVector& by_fields, const std::string& batch_json,
                                     }));
 
   ASSERT_OK_AND_ASSIGN(auto groupings_and_values, MakeGroupings(*by));
+  ASSERT_OK(groupings_and_values->ValidateFull());
 
   auto groupings =
       checked_pointer_cast<ListArray>(groupings_and_values->GetFieldByName("groupings"));
 
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> grouped_ids,
                        ApplyGroupings(*groupings, *batch->GetColumnByName("id")));
+  ASSERT_OK(grouped_ids->ValidateFull());
 
   ArrayVector columns =
       checked_cast<const StructArray&>(*groupings_and_values->GetFieldByName("values"))
@@ -562,6 +575,7 @@ void AssertGrouping(const FieldVector& by_fields, const std::string& batch_json,
   columns.push_back(grouped_ids);
 
   ASSERT_OK_AND_ASSIGN(auto actual, StructArray::Make(columns, fields_with_ids));
+  ASSERT_OK(actual->ValidateFull());
 
   AssertArraysEqual(*expected, *actual, /*verbose=*/true);
 }
@@ -585,5 +599,52 @@ TEST(GroupTest, Basics) {
   ])");
 }
 
+TEST(GroupTest, WithNulls) {
+  auto has_nulls = checked_pointer_cast<StructArray>(
+      ArrayFromJSON(struct_({field("a", utf8()), field("b", int32())}), R"([
+    {"a": "ex",  "b": 0},
+    {"a": null,  "b": 0},
+    {"a": "why", "b": 0},
+    {"a": "ex",  "b": 1},
+    {"a": "why", "b": 0},
+    {"a": "ex",  "b": 1},
+    {"a": "ex",  "b": 0},
+    {"a": "why", "b": null}
+  ])"));
+  ASSERT_RAISES(NotImplemented, MakeGroupings(*has_nulls));
+
+  has_nulls = checked_pointer_cast<StructArray>(
+      ArrayFromJSON(struct_({field("a", utf8()), field("b", int32())}), R"([
+    {"a": "ex",  "b": 0},
+    null,
+    {"a": "why", "b": 0},
+    {"a": "ex",  "b": 1},
+    {"a": "why", "b": 0},
+    {"a": "ex",  "b": 1},
+    {"a": "ex",  "b": 0},
+    null
+  ])"));
+  ASSERT_RAISES(Invalid, MakeGroupings(*has_nulls));
+}
+
+TEST(GroupTest, GroupOnDictionary) {
+  AssertGrouping({field("a", dictionary(int32(), utf8())), field("b", int32())}, R"([
+    {"a": "ex",  "b": 0, "id": 0},
+    {"a": "ex",  "b": 0, "id": 1},
+    {"a": "why", "b": 0, "id": 2},
+    {"a": "ex",  "b": 1, "id": 3},
+    {"a": "why", "b": 0, "id": 4},
+    {"a": "ex",  "b": 1, "id": 5},
+    {"a": "ex",  "b": 0, "id": 6},
+    {"a": "why", "b": 1, "id": 7}
+  ])",
+                 R"([
+    {"a": "ex",  "b": 0, "ids": [0, 1, 6]},
+    {"a": "why", "b": 0, "ids": [2, 4]},
+    {"a": "ex",  "b": 1, "ids": [3, 5]},
+    {"a": "why", "b": 1, "ids": [7]}
+  ])");
+}
+
 }  // namespace dataset
 }  // namespace arrow
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 410ca12..151ae81 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -1403,6 +1403,25 @@ cdef class PartitioningFactory(_Weakrefable):
         return self.wrapped
 
 
+cdef vector[shared_ptr[CArray]] _partitioning_dictionaries(
+        Schema schema, dictionaries) except *:
+    cdef:
+        vector[shared_ptr[CArray]] c_dictionaries
+
+    dictionaries = dictionaries or {}
+
+    for field in schema:
+        dictionary = dictionaries.get(field.name)
+
+        if (isinstance(field.type, pa.DictionaryType) and
+                dictionary is not None):
+            c_dictionaries.push_back(pyarrow_unwrap_array(dictionary))
+        else:
+            c_dictionaries.push_back(<shared_ptr[CArray]> nullptr)
+
+    return c_dictionaries
+
+
 cdef class DirectoryPartitioning(Partitioning):
     """
     A Partitioning based on a specified Schema.
@@ -1416,6 +1435,11 @@ cdef class DirectoryPartitioning(Partitioning):
     ----------
     schema : Schema
         The schema that describes the partitions present in the file path.
+    dictionaries : Dict[str, Array]
+        If the type of any field of `schema` is a dictionary type, the
+        corresponding entry of `dictionaries` must be an array containing
+        every value which may be taken by the corresponding column or an
+        error will be raised in parsing.
 
     Returns
     -------
@@ -1433,12 +1457,15 @@ cdef class DirectoryPartitioning(Partitioning):
     cdef:
         CDirectoryPartitioning* directory_partitioning
 
-    def __init__(self, Schema schema not None):
-        cdef shared_ptr[CDirectoryPartitioning] partitioning
-        partitioning = make_shared[CDirectoryPartitioning](
-            pyarrow_unwrap_schema(schema)
+    def __init__(self, Schema schema not None, dictionaries=None):
+        cdef:
+            shared_ptr[CDirectoryPartitioning] c_partitioning
+
+        c_partitioning = make_shared[CDirectoryPartitioning](
+            pyarrow_unwrap_schema(schema),
+            _partitioning_dictionaries(schema, dictionaries)
         )
-        self.init(<shared_ptr[CPartitioning]> partitioning)
+        self.init(<shared_ptr[CPartitioning]> c_partitioning)
 
     cdef init(self, const shared_ptr[CPartitioning]& sp):
         Partitioning.init(self, sp)
@@ -1506,6 +1533,11 @@ cdef class HivePartitioning(Partitioning):
     ----------
     schema : Schema
         The schema that describes the partitions present in the file path.
+    dictionaries : Dict[str, Array]
+        If the type of any field of `schema` is a dictionary type, the
+        corresponding entry of `dictionaries` must be an array containing
+        every value which may be taken by the corresponding column or an
+        error will be raised in parsing.
 
     Returns
     -------
@@ -1524,12 +1556,15 @@ cdef class HivePartitioning(Partitioning):
     cdef:
         CHivePartitioning* hive_partitioning
 
-    def __init__(self, Schema schema not None):
-        cdef shared_ptr[CHivePartitioning] partitioning
-        partitioning = make_shared[CHivePartitioning](
-            pyarrow_unwrap_schema(schema)
+    def __init__(self, Schema schema not None, dictionaries=None):
+        cdef:
+            shared_ptr[CHivePartitioning] c_partitioning
+
+        c_partitioning = make_shared[CHivePartitioning](
+            pyarrow_unwrap_schema(schema),
+            _partitioning_dictionaries(schema, dictionaries)
         )
-        self.init(<shared_ptr[CPartitioning]> partitioning)
+        self.init(<shared_ptr[CPartitioning]> c_partitioning)
 
     cdef init(self, const shared_ptr[CPartitioning]& sp):
         Partitioning.init(self, sp)
@@ -2270,6 +2305,7 @@ def _filesystemdataset_write(
     Schema schema not None, FileSystem filesystem not None,
     Partitioning partitioning not None,
     FileWriteOptions file_options not None, bint use_threads,
+    int max_partitions,
 ):
     """
     CFileSystemDataset.Write wrapper
@@ -2283,6 +2319,7 @@ def _filesystemdataset_write(
     c_options.filesystem = filesystem.unwrap()
     c_options.base_dir = tobytes(_stringify_path(base_dir))
     c_options.partitioning = partitioning.unwrap()
+    c_options.max_partitions = max_partitions
     c_options.basename_template = tobytes(basename_template)
 
     if isinstance(data, Dataset):
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index adeb8ae..a7aa9c4 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -88,7 +88,8 @@ def scalar(value):
     return Expression._scalar(value)
 
 
-def partitioning(schema=None, field_names=None, flavor=None):
+def partitioning(schema=None, field_names=None, flavor=None,
+                 dictionaries=None):
     """
     Specify a partitioning scheme.
 
@@ -121,6 +122,11 @@ def partitioning(schema=None, field_names=None, flavor=None):
     flavor : str, default None
         The default is DirectoryPartitioning. Specify ``flavor="hive"`` for
         a HivePartitioning.
+    dictionaries : List[Array]
+        If the type of any field of `schema` is a dictionary type, the
+        corresponding entry of `dictionaries` must be an array containing
+        every value which may be taken by the corresponding column or an
+        error will be raised in parsing.
 
     Returns
     -------
@@ -158,7 +164,7 @@ def partitioning(schema=None, field_names=None, flavor=None):
             if field_names is not None:
                 raise ValueError(
                     "Cannot specify both 'schema' and 'field_names'")
-            return DirectoryPartitioning(schema)
+            return DirectoryPartitioning(schema, dictionaries)
         elif field_names is not None:
             if isinstance(field_names, list):
                 return DirectoryPartitioning.discover(field_names)
@@ -175,7 +181,7 @@ def partitioning(schema=None, field_names=None, flavor=None):
             raise ValueError("Cannot specify 'field_names' for flavor 'hive'")
         elif schema is not None:
             if isinstance(schema, pa.Schema):
-                return HivePartitioning(schema)
+                return HivePartitioning(schema, dictionaries)
             else:
                 raise ValueError(
                     "Expected Schema for 'schema', got {}".format(
@@ -635,7 +641,8 @@ def _ensure_write_partitioning(scheme):
 
 def write_dataset(data, base_dir, basename_template=None, format=None,
                   partitioning=None, schema=None,
-                  filesystem=None, file_options=None, use_threads=True):
+                  filesystem=None, file_options=None, use_threads=True,
+                  max_partitions=None):
     """
     Write a dataset to a given format and partitioning.
 
@@ -668,6 +675,8 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
     use_threads : bool, default True
         Write files in parallel. If enabled, then maximum parallelism will be
         used determined by the number of available CPU cores.
+    max_partitions : int, default 1024
+        Maximum number of partitions any batch may be written into.
     """
     from pyarrow.fs import LocalFileSystem, _ensure_filesystem
 
@@ -700,6 +709,9 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
     if basename_template is None:
         basename_template = "part-{i}." + format.default_extname
 
+    if max_partitions is None:
+        max_partitions = 1024
+
     partitioning = _ensure_write_partitioning(partitioning)
 
     if filesystem is None:
@@ -711,4 +723,5 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
     _filesystemdataset_write(
         data, base_dir, basename_template, schema,
         filesystem, partitioning, file_options, use_threads,
+        max_partitions
     )
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 73803c0..362967d 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -213,6 +213,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
         shared_ptr[CFileSystem] filesystem
         c_string base_dir
         shared_ptr[CPartitioning] partitioning
+        int max_partitions
         c_string basename_template
 
     cdef cppclass CFileSystemDataset \
@@ -277,7 +278,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
 
     cdef cppclass CDirectoryPartitioning \
             "arrow::dataset::DirectoryPartitioning"(CPartitioning):
-        CDirectoryPartitioning(shared_ptr[CSchema] schema)
+        CDirectoryPartitioning(shared_ptr[CSchema] schema,
+                               vector[shared_ptr[CArray]] dictionaries)
 
         @staticmethod
         shared_ptr[CPartitioningFactory] MakeFactory(
@@ -285,7 +287,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
 
     cdef cppclass CHivePartitioning \
             "arrow::dataset::HivePartitioning"(CPartitioning):
-        CHivePartitioning(shared_ptr[CSchema] schema)
+        CHivePartitioning(shared_ptr[CSchema] schema,
+                          vector[shared_ptr[CArray]] dictionaries)
 
         @staticmethod
         shared_ptr[CPartitioningFactory] MakeFactory(
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 0ab9d95..c3ad83b 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -2317,6 +2317,32 @@ def test_write_dataset_partitioned(tempdir):
 
 @pytest.mark.parquet
 @pytest.mark.pandas
+def test_write_dataset_partitioned_dict(tempdir):
+    directory = tempdir / "partitioned"
+    _ = _create_parquet_dataset_partitioned(directory)
+
+    # directory partitioning, dictionary partition columns
+    dataset = ds.dataset(
+        directory,
+        partitioning=ds.HivePartitioning.discover(infer_dictionary=True))
+    target = tempdir / 'partitioned-dir-target'
+    expected_paths = [
+        target / "a", target / "a" / "part-0.feather",
+        target / "b", target / "b" / "part-1.feather"
+    ]
+    partitioning = ds.partitioning(pa.schema([
+        dataset.schema.field('part')]),
+        dictionaries={'part': pa.array(['a', 'b'])})
+    # NB: dictionaries required here since we use partitioning to parse
+    # directories in _check_dataset_roundtrip (not currently required for
+    # the formatting step)
+    _check_dataset_roundtrip(
+        dataset, str(target), expected_paths, target,
+        partitioning=partitioning)
+
+
+@pytest.mark.parquet
+@pytest.mark.pandas
 def test_write_dataset_use_threads(tempdir):
     directory = tempdir / "partitioned"
     _ = _create_parquet_dataset_partitioned(directory)
@@ -2412,6 +2438,30 @@ def test_write_table_multiple_fragments(tempdir):
     )
 
 
+def test_write_table_partitioned_dict(tempdir):
+    # ensure writing table partitioned on a dictionary column works without
+    # specifying the dictionary values explicitly
+    table = pa.table([
+        pa.array(range(20)),
+        pa.array(np.repeat(['a', 'b'], 10)).dictionary_encode(),
+    ], names=['col', 'part'])
+
+    partitioning = ds.partitioning(table.select(["part"]).schema)
+
+    base_dir = tempdir / "dataset"
+    ds.write_dataset(
+        table, base_dir, format="feather", partitioning=partitioning
+    )
+
+    # check roundtrip
+    partitioning_read = ds.DirectoryPartitioning.discover(
+        ["part"], infer_dictionary=True)
+    result = ds.dataset(
+        base_dir, format="ipc", partitioning=partitioning_read
+    ).to_table()
+    assert result.equals(table)
+
+
 @pytest.mark.parquet
 def test_write_dataset_parquet(tempdir):
     import pyarrow.parquet as pq
diff --git a/testing b/testing
index b4eeafd..d6c4deb 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit b4eeafdec6fb5284c4aaf269f2ebdb3be2c63ed5
+Subproject commit d6c4deb22c4b4e9e3247a2f291046e3c671ad235