You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2021/01/15 16:44:36 UTC
[arrow] branch master updated: ARROW-10247: [C++][Dataset] Support
writing datasets partitioned on dictionary columns
This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new eaa7b7a ARROW-10247: [C++][Dataset] Support writing datasets partitioned on dictionary columns
eaa7b7a is described below
commit eaa7b7af79fc543c2603a8ef37553061097dab87
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Fri Jan 15 11:43:50 2021 -0500
ARROW-10247: [C++][Dataset] Support writing datasets partitioned on dictionary columns
Enables usage of dictionary columns as partition columns on write.
Additionally resolves some partition-related follow ups from #8894 (@pitrou):
- raise an error status [instead of aborting](https://github.com/apache/arrow/pull/8894/#discussion_r545219042) for overflowing maximum group count
- handle dictionary index types [other than int32](https://github.com/apache/arrow/pull/8894/#discussion_r545215901)
- don't build an unused null bitmap [in CountsToOffsets](https://github.com/apache/arrow/pull/8894/#discussion_r545212237)
- improve docstrings for [MakeGroupings, ApplyGroupings](https://github.com/apache/arrow/pull/8894/#discussion_r545209568)
At some point, we'll probably want to support null grouping criteria. (For now, this PR adds a test asserting that nulls in any grouping column raise an error.) This will require adding an option/overload/... of dictionary_encode which places nulls in the dictionary instead of the indices, and ensuring Partitionings can format nulls appropriately. This would allow users to write a partitioned dataset which preserves nulls sensibly:
```
data/
col=a/
part-0.parquet # col is "a" throughout
col=b/
part-1.parquet # col is "b" throughout
part-2.parquet # col is null throughout
```
Closes #9130 from bkietz/10247-Cannot-write-dataset-with
Lead-authored-by: Benjamin Kietzman <be...@gmail.com>
Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
Signed-off-by: Benjamin Kietzman <be...@gmail.com>
---
cpp/src/arrow/dataset/expression.cc | 57 +++++++-------
cpp/src/arrow/dataset/file_base.cc | 12 ++-
cpp/src/arrow/dataset/file_base.h | 3 +
cpp/src/arrow/dataset/file_ipc_test.cc | 12 +++
cpp/src/arrow/dataset/partition.cc | 109 ++++++++++++++++++---------
cpp/src/arrow/dataset/partition.h | 56 ++++++++++++--
cpp/src/arrow/dataset/partition_test.cc | 61 +++++++++++++++
python/pyarrow/_dataset.pyx | 57 +++++++++++---
python/pyarrow/dataset.py | 21 +++++-
python/pyarrow/includes/libarrow_dataset.pxd | 7 +-
python/pyarrow/tests/test_dataset.py | 50 ++++++++++++
testing | 2 +-
12 files changed, 356 insertions(+), 91 deletions(-)
diff --git a/cpp/src/arrow/dataset/expression.cc b/cpp/src/arrow/dataset/expression.cc
index 296fb22..e7acac3 100644
--- a/cpp/src/arrow/dataset/expression.cc
+++ b/cpp/src/arrow/dataset/expression.cc
@@ -90,27 +90,35 @@ ValueDescr Expression::descr() const {
return CallNotNull(*this)->descr;
}
+namespace {
+
+std::string PrintDatum(const Datum& datum) {
+ if (datum.is_scalar()) {
+ switch (datum.type()->id()) {
+ case Type::STRING:
+ case Type::LARGE_STRING:
+ return '"' +
+ Escape(util::string_view(*datum.scalar_as<BaseBinaryScalar>().value)) +
+ '"';
+
+ case Type::BINARY:
+ case Type::FIXED_SIZE_BINARY:
+ case Type::LARGE_BINARY:
+ return '"' + datum.scalar_as<BaseBinaryScalar>().value->ToHexString() + '"';
+
+ default:
+ break;
+ }
+ return datum.scalar()->ToString();
+ }
+ return datum.ToString();
+}
+
+} // namespace
+
std::string Expression::ToString() const {
if (auto lit = literal()) {
- if (lit->is_scalar()) {
- switch (lit->type()->id()) {
- case Type::STRING:
- case Type::LARGE_STRING:
- return '"' +
- Escape(util::string_view(*lit->scalar_as<BaseBinaryScalar>().value)) +
- '"';
-
- case Type::BINARY:
- case Type::FIXED_SIZE_BINARY:
- case Type::LARGE_BINARY:
- return '"' + lit->scalar_as<BaseBinaryScalar>().value->ToHexString() + '"';
-
- default:
- break;
- }
- return lit->scalar()->ToString();
- }
- return lit->ToString();
+ return PrintDatum(*lit);
}
if (auto ref = field_ref()) {
@@ -763,16 +771,7 @@ Status ExtractKnownFieldValuesImpl(
auto ref = call->arguments[0].field_ref();
auto lit = call->arguments[1].literal();
- auto it_success = known_values->emplace(*ref, *lit);
- if (it_success.second) continue;
-
- // A value was already known for ref; check it
- auto ref_lit = it_success.first;
- if (*lit != ref_lit->second) {
- return Status::Invalid("Conflicting guarantees: (", ref->ToString(),
- " == ", lit->ToString(), ") vs (", ref->ToString(),
- " == ", ref_lit->second.ToString());
- }
+ known_values->emplace(*ref, *lit);
}
conjunction_members->erase(unconsumed_end, conjunction_members->end());
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 2c437ce..612c249 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -313,12 +313,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch));
batch.reset(); // drop to hopefully conserve memory
+ if (groups.batches.size() > static_cast<size_t>(write_options.max_partitions)) {
+ return Status::Invalid("Fragment would be written into ", groups.batches.size(),
+ " partitions. This exceeds the maximum of ",
+ write_options.max_partitions);
+ }
+
std::unordered_set<WriteQueue*> need_flushed;
for (size_t i = 0; i < groups.batches.size(); ++i) {
- ARROW_ASSIGN_OR_RAISE(
- auto partition_expression,
- and_(std::move(groups.expressions[i]), fragment->partition_expression())
- .Bind(*scanner->schema()));
+ auto partition_expression =
+ and_(std::move(groups.expressions[i]), fragment->partition_expression());
auto batch = std::move(groups.batches[i]);
ARROW_ASSIGN_OR_RAISE(auto part,
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index bb2aa86..708f7e0 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -295,6 +295,9 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
/// Partitioning used to generate fragment paths.
std::shared_ptr<Partitioning> partitioning;
+ /// Maximum number of partitions any batch may be written into, default is 1K.
+ int max_partitions = 1024;
+
/// Template string used to generate fragment basenames.
/// {i} will be replaced by an auto incremented integer.
std::string basename_template;
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc
index 4242113..b8428e0 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -223,6 +223,18 @@ TEST_F(TestIpcFileSystemDataset, WriteWithEmptyPartitioningSchema) {
TestWriteWithEmptyPartitioningSchema();
}
+TEST_F(TestIpcFileSystemDataset, WriteExceedsMaxPartitions) {
+ write_options_.partitioning = std::make_shared<DirectoryPartitioning>(
+ SchemaFromColumnNames(source_schema_, {"model"}));
+
+ // require that no batch be grouped into more than 2 written batches:
+ write_options_.max_partitions = 2;
+
+ auto scanner = std::make_shared<Scanner>(dataset_, scan_options_, scan_context_);
+ EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("This exceeds the maximum"),
+ FileSystemDataset::Write(write_options_, scanner));
+}
+
TEST_F(TestIpcFileFormat, OpenFailureWithRelevantError) {
std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
auto result = format_->Inspect(FileSource(buf));
diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc
index 3a164d8..d6a3723 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -23,6 +23,7 @@
#include <vector>
#include "arrow/array/array_base.h"
+#include "arrow/array/array_dict.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/builder_dict.h"
#include "arrow/compute/api_scalar.h"
@@ -191,7 +192,7 @@ Result<Expression> KeyValuePartitioning::Parse(const std::string& path) const {
}
Result<std::string> KeyValuePartitioning::Format(const Expression& expr) const {
- std::vector<Scalar*> values{static_cast<size_t>(schema_->num_fields()), nullptr};
+ ScalarVector values{static_cast<size_t>(schema_->num_fields()), nullptr};
ARROW_ASSIGN_OR_RAISE(auto known_values, ExtractKnownFieldValues(expr));
for (const auto& ref_value : known_values) {
@@ -202,7 +203,7 @@ Result<std::string> KeyValuePartitioning::Format(const Expression& expr) const {
ARROW_ASSIGN_OR_RAISE(auto match, ref_value.first.FindOneOrNone(*schema_));
if (match.empty()) continue;
- const auto& value = ref_value.second.scalar();
+ auto value = ref_value.second.scalar();
const auto& field = schema_->field(match[0]);
if (!value->type->Equals(field->type())) {
@@ -210,7 +211,12 @@ Result<std::string> KeyValuePartitioning::Format(const Expression& expr) const {
") is invalid for ", field->ToString());
}
- values[match[0]] = value.get();
+ if (value->type->id() == Type::DICTIONARY) {
+ ARROW_ASSIGN_OR_RAISE(
+ value, checked_cast<const DictionaryScalar&>(*value).GetEncodedValue());
+ }
+
+ values[match[0]] = std::move(value);
}
return FormatValues(values);
@@ -230,9 +236,9 @@ std::vector<KeyValuePartitioning::Key> DirectoryPartitioning::ParseKeys(
return keys;
}
-inline util::optional<int> NextValid(const std::vector<Scalar*>& values, int first_null) {
+inline util::optional<int> NextValid(const ScalarVector& values, int first_null) {
auto it = std::find_if(values.begin() + first_null + 1, values.end(),
- [](Scalar* v) { return v != nullptr; });
+ [](const std::shared_ptr<Scalar>& v) { return v != nullptr; });
if (it == values.end()) {
return util::nullopt;
@@ -242,7 +248,7 @@ inline util::optional<int> NextValid(const std::vector<Scalar*>& values, int fir
}
Result<std::string> DirectoryPartitioning::FormatValues(
- const std::vector<Scalar*>& values) const {
+ const ScalarVector& values) const {
std::vector<std::string> segments(static_cast<size_t>(schema_->num_fields()));
for (int i = 0; i < schema_->num_fields(); ++i) {
@@ -426,8 +432,7 @@ std::vector<KeyValuePartitioning::Key> HivePartitioning::ParseKeys(
return keys;
}
-Result<std::string> HivePartitioning::FormatValues(
- const std::vector<Scalar*>& values) const {
+Result<std::string> HivePartitioning::FormatValues(const ScalarVector& values) const {
std::vector<std::string> segments(static_cast<size_t>(schema_->num_fields()));
for (int i = 0; i < schema_->num_fields(); ++i) {
@@ -532,19 +537,21 @@ Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema(
// Transform an array of counts to offsets which will divide a ListArray
// into an equal number of slices with corresponding lengths.
-inline Result<std::shared_ptr<Array>> CountsToOffsets(
+inline Result<std::shared_ptr<Buffer>> CountsToOffsets(
std::shared_ptr<Int64Array> counts) {
- Int32Builder offset_builder;
+ TypedBufferBuilder<int32_t> offset_builder;
RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1));
- offset_builder.UnsafeAppend(0);
+
+ int32_t current_offset = 0;
+ offset_builder.UnsafeAppend(current_offset);
for (int64_t i = 0; i < counts->length(); ++i) {
DCHECK_NE(counts->Value(i), 0);
- auto next_offset = static_cast<int32_t>(offset_builder[i] + counts->Value(i));
- offset_builder.UnsafeAppend(next_offset);
+ current_offset += static_cast<int32_t>(counts->Value(i));
+ offset_builder.UnsafeAppend(current_offset);
}
- std::shared_ptr<Array> offsets;
+ std::shared_ptr<Buffer> offsets;
RETURN_NOT_OK(offset_builder.Finish(&offsets));
return offsets;
}
@@ -604,6 +611,12 @@ class StructDictionary {
RETURN_NOT_OK(builders[i].FinishInternal(&indices));
ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], indices));
+
+ if (fields[i]->type()->id() == Type::DICTIONARY) {
+ RETURN_NOT_OK(RestoreDictionaryEncoding(
+ checked_pointer_cast<DictionaryType>(fields[i]->type()), &column));
+ }
+
columns[i] = column.make_array();
}
@@ -612,27 +625,22 @@ class StructDictionary {
private:
Status AddOne(Datum column, std::shared_ptr<Int32Array>* fused_indices) {
- ArrayData* encoded;
if (column.type()->id() != Type::DICTIONARY) {
- ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(column));
+ ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(std::move(column)));
}
- encoded = column.mutable_array();
-
- auto indices =
- std::make_shared<Int32Array>(encoded->length, std::move(encoded->buffers[1]));
- dictionaries_.push_back(MakeArray(std::move(encoded->dictionary)));
- auto dictionary_size = static_cast<int32_t>(dictionaries_.back()->length());
+ auto dict_column = column.array_as<DictionaryArray>();
+ dictionaries_.push_back(dict_column->dictionary());
+ ARROW_ASSIGN_OR_RAISE(auto indices, compute::Cast(*dict_column->indices(), int32()));
if (*fused_indices == nullptr) {
- *fused_indices = std::move(indices);
- size_ = dictionary_size;
- return Status::OK();
+ *fused_indices = checked_pointer_cast<Int32Array>(std::move(indices));
+ return IncreaseSize();
}
// It's useful to think about the case where each of dictionaries_ has size 10.
// In this case the decimal digit in the ones place is the code in dictionaries_[0],
- // the tens place corresponds to dictionaries_[1], etc.
+ // the tens place corresponds to the code in dictionaries_[1], etc.
// The incumbent indices must be shifted to the hundreds place so as not to collide.
ARROW_ASSIGN_OR_RAISE(Datum new_fused_indices,
compute::Multiply(indices, MakeScalar(size_)));
@@ -641,10 +649,7 @@ class StructDictionary {
compute::Add(new_fused_indices, *fused_indices));
*fused_indices = checked_pointer_cast<Int32Array>(new_fused_indices.make_array());
-
- // XXX should probably cap this at 2**15 or so
- ARROW_CHECK(!internal::MultiplyWithOverflow(size_, dictionary_size, &size_));
- return Status::OK();
+ return IncreaseSize();
}
// expand a fused code into component dict codes, order is in order of addition
@@ -656,13 +661,48 @@ class StructDictionary {
}
}
- int32_t size_;
+ Status RestoreDictionaryEncoding(std::shared_ptr<DictionaryType> expected_type,
+ Datum* column) {
+ DCHECK_NE(column->type()->id(), Type::DICTIONARY);
+ ARROW_ASSIGN_OR_RAISE(*column, compute::DictionaryEncode(std::move(*column)));
+
+ if (expected_type->index_type()->id() == Type::INT32) {
+ // dictionary_encode has already yielded the expected index_type
+ return Status::OK();
+ }
+
+ // cast the indices to the expected index type
+ auto dictionary = std::move(column->mutable_array()->dictionary);
+ column->mutable_array()->type = int32();
+
+ ARROW_ASSIGN_OR_RAISE(*column,
+ compute::Cast(std::move(*column), expected_type->index_type()));
+
+ column->mutable_array()->dictionary = std::move(dictionary);
+ column->mutable_array()->type = expected_type;
+ return Status::OK();
+ }
+
+ Status IncreaseSize() {
+ auto factor = static_cast<int32_t>(dictionaries_.back()->length());
+
+ if (internal::MultiplyWithOverflow(size_, factor, &size_)) {
+ return Status::CapacityError("Max groups exceeded");
+ }
+ return Status::OK();
+ }
+
+ int32_t size_ = 1;
ArrayVector dictionaries_;
};
Result<std::shared_ptr<StructArray>> MakeGroupings(const StructArray& by) {
if (by.num_fields() == 0) {
- return Status::NotImplemented("Grouping with no criteria");
+ return Status::Invalid("Grouping with no criteria");
+ }
+
+ if (by.null_count() != 0) {
+ return Status::Invalid("Grouping with null criteria");
}
ARROW_ASSIGN_OR_RAISE(auto fused, StructDictionary::Encode(by.fields()));
@@ -685,8 +725,9 @@ Result<std::shared_ptr<StructArray>> MakeGroupings(const StructArray& by) {
checked_pointer_cast<Int64Array>(fused_counts_and_values->GetFieldByName("counts"));
ARROW_ASSIGN_OR_RAISE(auto offsets, CountsToOffsets(std::move(counts)));
- ARROW_ASSIGN_OR_RAISE(auto grouped_sort_indices,
- ListArray::FromArrays(*offsets, *sort_indices));
+ auto grouped_sort_indices =
+ std::make_shared<ListArray>(list(sort_indices->type()), unique_rows->length(),
+ std::move(offsets), std::move(sort_indices));
return StructArray::Make(
ArrayVector{std::move(unique_rows), std::move(grouped_sort_indices)},
diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h
index 8975f56..944434e 100644
--- a/cpp/src/arrow/dataset/partition.h
+++ b/cpp/src/arrow/dataset/partition.h
@@ -142,7 +142,7 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning {
virtual std::vector<Key> ParseKeys(const std::string& path) const = 0;
- virtual Result<std::string> FormatValues(const std::vector<Scalar*>& values) const = 0;
+ virtual Result<std::string> FormatValues(const ScalarVector& values) const = 0;
/// Convert a Key to a full expression.
Result<Expression> ConvertKey(const Key& key) const;
@@ -172,7 +172,7 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning {
private:
std::vector<Key> ParseKeys(const std::string& path) const override;
- Result<std::string> FormatValues(const std::vector<Scalar*>& values) const override;
+ Result<std::string> FormatValues(const ScalarVector& values) const override;
};
/// \brief Multi-level, directory based partitioning
@@ -201,7 +201,7 @@ class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning {
private:
std::vector<Key> ParseKeys(const std::string& path) const override;
- Result<std::string> FormatValues(const std::vector<Scalar*>& values) const override;
+ Result<std::string> FormatValues(const ScalarVector& values) const override;
};
/// \brief Implementation provided by lambda or other callable
@@ -288,16 +288,58 @@ class ARROW_DS_EXPORT PartitioningOrFactory {
/// \brief Assemble lists of indices of identical rows.
///
/// \param[in] by A StructArray whose columns will be used as grouping criteria.
-/// \return A StructArray mapping unique rows (in field "values", represented as a
-/// StructArray with the same fields as `by`) to lists of indices where
-/// that row appears (in field "groupings").
+/// Top level nulls are invalid, as are empty criteria (no grouping
+/// columns).
+/// \return A array of type `struct<values: by.type, groupings: list<int64>>`,
+/// which is a mapping from unique rows (field "values") to lists of
+/// indices into `by` where that row appears (field "groupings").
+///
+/// For example,
+/// MakeGroupings([
+/// {"a": "ex", "b": 0},
+/// {"a": "ex", "b": 0},
+/// {"a": "why", "b": 0},
+/// {"a": "why", "b": 0},
+/// {"a": "ex", "b": 0},
+/// {"a": "why", "b": 1}
+/// ]) == [
+/// {"values": {"a": "ex", "b": 0}, "groupings": [0, 1, 4]},
+/// {"values": {"a": "why", "b": 0}, "groupings": [2, 3]},
+/// {"values": {"a": "why", "b": 1}, "groupings": [5]}
+/// ]
ARROW_DS_EXPORT
Result<std::shared_ptr<StructArray>> MakeGroupings(const StructArray& by);
-/// \brief Produce slices of an Array which correspond to the provided groupings.
+/// \brief Produce a ListArray whose slots are selections of `array` which correspond to
+/// the provided groupings.
+///
+/// For example,
+/// ApplyGroupings([[0, 1, 4], [2, 3], [5]], [
+/// {"a": "ex", "b": 0, "passenger": 0},
+/// {"a": "ex", "b": 0, "passenger": 1},
+/// {"a": "why", "b": 0, "passenger": 2},
+/// {"a": "why", "b": 0, "passenger": 3},
+/// {"a": "ex", "b": 0, "passenger": 4},
+/// {"a": "why", "b": 1, "passenger": 5}
+/// ]) == [
+/// [
+/// {"a": "ex", "b": 0, "passenger": 0},
+/// {"a": "ex", "b": 0, "passenger": 1},
+/// {"a": "ex", "b": 0, "passenger": 4},
+/// ],
+/// [
+/// {"a": "why", "b": 0, "passenger": 2},
+/// {"a": "why", "b": 0, "passenger": 3},
+/// ],
+/// [
+/// {"a": "why", "b": 1, "passenger": 5}
+/// ]
+/// ]
ARROW_DS_EXPORT
Result<std::shared_ptr<ListArray>> ApplyGroupings(const ListArray& groupings,
const Array& array);
+
+/// \brief Produce selections of a RecordBatch which correspond to the provided groupings.
ARROW_DS_EXPORT
Result<RecordBatchVector> ApplyGroupings(const ListArray& groupings,
const std::shared_ptr<RecordBatch>& batch);
diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc
index 2260eb2..286848d 100644
--- a/cpp/src/arrow/dataset/partition_test.cc
+++ b/cpp/src/arrow/dataset/partition_test.cc
@@ -32,6 +32,7 @@
#include "arrow/filesystem/path_util.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/util/range.h"
namespace arrow {
using internal::checked_pointer_cast;
@@ -152,6 +153,16 @@ TEST_F(TestPartitioning, DirectoryPartitioningFormat) {
equal(field_ref("beta"), literal("hello"))));
}
+TEST_F(TestPartitioning, DirectoryPartitioningFormatDictionary) {
+ auto dictionary = ArrayFromJSON(utf8(), R"(["hello", "world"])");
+ partitioning_ = std::make_shared<DirectoryPartitioning>(schema({DictStr("alpha")}),
+ ArrayVector{dictionary});
+ written_schema_ = partitioning_->schema();
+
+ ASSERT_OK_AND_ASSIGN(auto dict_hello, MakeScalar("hello")->CastTo(DictStr("")->type()));
+ AssertFormat(equal(field_ref("alpha"), literal(dict_hello)), "hello");
+}
+
TEST_F(TestPartitioning, DirectoryPartitioningWithTemporal) {
for (auto temporal : {timestamp(TimeUnit::SECOND), date32()}) {
partitioning_ = std::make_shared<DirectoryPartitioning>(
@@ -549,12 +560,14 @@ void AssertGrouping(const FieldVector& by_fields, const std::string& batch_json,
}));
ASSERT_OK_AND_ASSIGN(auto groupings_and_values, MakeGroupings(*by));
+ ASSERT_OK(groupings_and_values->ValidateFull());
auto groupings =
checked_pointer_cast<ListArray>(groupings_and_values->GetFieldByName("groupings"));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> grouped_ids,
ApplyGroupings(*groupings, *batch->GetColumnByName("id")));
+ ASSERT_OK(grouped_ids->ValidateFull());
ArrayVector columns =
checked_cast<const StructArray&>(*groupings_and_values->GetFieldByName("values"))
@@ -562,6 +575,7 @@ void AssertGrouping(const FieldVector& by_fields, const std::string& batch_json,
columns.push_back(grouped_ids);
ASSERT_OK_AND_ASSIGN(auto actual, StructArray::Make(columns, fields_with_ids));
+ ASSERT_OK(actual->ValidateFull());
AssertArraysEqual(*expected, *actual, /*verbose=*/true);
}
@@ -585,5 +599,52 @@ TEST(GroupTest, Basics) {
])");
}
+TEST(GroupTest, WithNulls) {
+ auto has_nulls = checked_pointer_cast<StructArray>(
+ ArrayFromJSON(struct_({field("a", utf8()), field("b", int32())}), R"([
+ {"a": "ex", "b": 0},
+ {"a": null, "b": 0},
+ {"a": "why", "b": 0},
+ {"a": "ex", "b": 1},
+ {"a": "why", "b": 0},
+ {"a": "ex", "b": 1},
+ {"a": "ex", "b": 0},
+ {"a": "why", "b": null}
+ ])"));
+ ASSERT_RAISES(NotImplemented, MakeGroupings(*has_nulls));
+
+ has_nulls = checked_pointer_cast<StructArray>(
+ ArrayFromJSON(struct_({field("a", utf8()), field("b", int32())}), R"([
+ {"a": "ex", "b": 0},
+ null,
+ {"a": "why", "b": 0},
+ {"a": "ex", "b": 1},
+ {"a": "why", "b": 0},
+ {"a": "ex", "b": 1},
+ {"a": "ex", "b": 0},
+ null
+ ])"));
+ ASSERT_RAISES(Invalid, MakeGroupings(*has_nulls));
+}
+
+TEST(GroupTest, GroupOnDictionary) {
+ AssertGrouping({field("a", dictionary(int32(), utf8())), field("b", int32())}, R"([
+ {"a": "ex", "b": 0, "id": 0},
+ {"a": "ex", "b": 0, "id": 1},
+ {"a": "why", "b": 0, "id": 2},
+ {"a": "ex", "b": 1, "id": 3},
+ {"a": "why", "b": 0, "id": 4},
+ {"a": "ex", "b": 1, "id": 5},
+ {"a": "ex", "b": 0, "id": 6},
+ {"a": "why", "b": 1, "id": 7}
+ ])",
+ R"([
+ {"a": "ex", "b": 0, "ids": [0, 1, 6]},
+ {"a": "why", "b": 0, "ids": [2, 4]},
+ {"a": "ex", "b": 1, "ids": [3, 5]},
+ {"a": "why", "b": 1, "ids": [7]}
+ ])");
+}
+
} // namespace dataset
} // namespace arrow
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 410ca12..151ae81 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -1403,6 +1403,25 @@ cdef class PartitioningFactory(_Weakrefable):
return self.wrapped
+cdef vector[shared_ptr[CArray]] _partitioning_dictionaries(
+ Schema schema, dictionaries) except *:
+ cdef:
+ vector[shared_ptr[CArray]] c_dictionaries
+
+ dictionaries = dictionaries or {}
+
+ for field in schema:
+ dictionary = dictionaries.get(field.name)
+
+ if (isinstance(field.type, pa.DictionaryType) and
+ dictionary is not None):
+ c_dictionaries.push_back(pyarrow_unwrap_array(dictionary))
+ else:
+ c_dictionaries.push_back(<shared_ptr[CArray]> nullptr)
+
+ return c_dictionaries
+
+
cdef class DirectoryPartitioning(Partitioning):
"""
A Partitioning based on a specified Schema.
@@ -1416,6 +1435,11 @@ cdef class DirectoryPartitioning(Partitioning):
----------
schema : Schema
The schema that describes the partitions present in the file path.
+ dictionaries : Dict[str, Array]
+ If the type of any field of `schema` is a dictionary type, the
+ corresponding entry of `dictionaries` must be an array containing
+ every value which may be taken by the corresponding column or an
+ error will be raised in parsing.
Returns
-------
@@ -1433,12 +1457,15 @@ cdef class DirectoryPartitioning(Partitioning):
cdef:
CDirectoryPartitioning* directory_partitioning
- def __init__(self, Schema schema not None):
- cdef shared_ptr[CDirectoryPartitioning] partitioning
- partitioning = make_shared[CDirectoryPartitioning](
- pyarrow_unwrap_schema(schema)
+ def __init__(self, Schema schema not None, dictionaries=None):
+ cdef:
+ shared_ptr[CDirectoryPartitioning] c_partitioning
+
+ c_partitioning = make_shared[CDirectoryPartitioning](
+ pyarrow_unwrap_schema(schema),
+ _partitioning_dictionaries(schema, dictionaries)
)
- self.init(<shared_ptr[CPartitioning]> partitioning)
+ self.init(<shared_ptr[CPartitioning]> c_partitioning)
cdef init(self, const shared_ptr[CPartitioning]& sp):
Partitioning.init(self, sp)
@@ -1506,6 +1533,11 @@ cdef class HivePartitioning(Partitioning):
----------
schema : Schema
The schema that describes the partitions present in the file path.
+ dictionaries : Dict[str, Array]
+ If the type of any field of `schema` is a dictionary type, the
+ corresponding entry of `dictionaries` must be an array containing
+ every value which may be taken by the corresponding column or an
+ error will be raised in parsing.
Returns
-------
@@ -1524,12 +1556,15 @@ cdef class HivePartitioning(Partitioning):
cdef:
CHivePartitioning* hive_partitioning
- def __init__(self, Schema schema not None):
- cdef shared_ptr[CHivePartitioning] partitioning
- partitioning = make_shared[CHivePartitioning](
- pyarrow_unwrap_schema(schema)
+ def __init__(self, Schema schema not None, dictionaries=None):
+ cdef:
+ shared_ptr[CHivePartitioning] c_partitioning
+
+ c_partitioning = make_shared[CHivePartitioning](
+ pyarrow_unwrap_schema(schema),
+ _partitioning_dictionaries(schema, dictionaries)
)
- self.init(<shared_ptr[CPartitioning]> partitioning)
+ self.init(<shared_ptr[CPartitioning]> c_partitioning)
cdef init(self, const shared_ptr[CPartitioning]& sp):
Partitioning.init(self, sp)
@@ -2270,6 +2305,7 @@ def _filesystemdataset_write(
Schema schema not None, FileSystem filesystem not None,
Partitioning partitioning not None,
FileWriteOptions file_options not None, bint use_threads,
+ int max_partitions,
):
"""
CFileSystemDataset.Write wrapper
@@ -2283,6 +2319,7 @@ def _filesystemdataset_write(
c_options.filesystem = filesystem.unwrap()
c_options.base_dir = tobytes(_stringify_path(base_dir))
c_options.partitioning = partitioning.unwrap()
+ c_options.max_partitions = max_partitions
c_options.basename_template = tobytes(basename_template)
if isinstance(data, Dataset):
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index adeb8ae..a7aa9c4 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -88,7 +88,8 @@ def scalar(value):
return Expression._scalar(value)
-def partitioning(schema=None, field_names=None, flavor=None):
+def partitioning(schema=None, field_names=None, flavor=None,
+ dictionaries=None):
"""
Specify a partitioning scheme.
@@ -121,6 +122,11 @@ def partitioning(schema=None, field_names=None, flavor=None):
flavor : str, default None
The default is DirectoryPartitioning. Specify ``flavor="hive"`` for
a HivePartitioning.
+ dictionaries : List[Array]
+ If the type of any field of `schema` is a dictionary type, the
+ corresponding entry of `dictionaries` must be an array containing
+ every value which may be taken by the corresponding column or an
+ error will be raised in parsing.
Returns
-------
@@ -158,7 +164,7 @@ def partitioning(schema=None, field_names=None, flavor=None):
if field_names is not None:
raise ValueError(
"Cannot specify both 'schema' and 'field_names'")
- return DirectoryPartitioning(schema)
+ return DirectoryPartitioning(schema, dictionaries)
elif field_names is not None:
if isinstance(field_names, list):
return DirectoryPartitioning.discover(field_names)
@@ -175,7 +181,7 @@ def partitioning(schema=None, field_names=None, flavor=None):
raise ValueError("Cannot specify 'field_names' for flavor 'hive'")
elif schema is not None:
if isinstance(schema, pa.Schema):
- return HivePartitioning(schema)
+ return HivePartitioning(schema, dictionaries)
else:
raise ValueError(
"Expected Schema for 'schema', got {}".format(
@@ -635,7 +641,8 @@ def _ensure_write_partitioning(scheme):
def write_dataset(data, base_dir, basename_template=None, format=None,
partitioning=None, schema=None,
- filesystem=None, file_options=None, use_threads=True):
+ filesystem=None, file_options=None, use_threads=True,
+ max_partitions=None):
"""
Write a dataset to a given format and partitioning.
@@ -668,6 +675,8 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
use_threads : bool, default True
Write files in parallel. If enabled, then maximum parallelism will be
used determined by the number of available CPU cores.
+ max_partitions : int, default 1024
+ Maximum number of partitions any batch may be written into.
"""
from pyarrow.fs import LocalFileSystem, _ensure_filesystem
@@ -700,6 +709,9 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
if basename_template is None:
basename_template = "part-{i}." + format.default_extname
+ if max_partitions is None:
+ max_partitions = 1024
+
partitioning = _ensure_write_partitioning(partitioning)
if filesystem is None:
@@ -711,4 +723,5 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
_filesystemdataset_write(
data, base_dir, basename_template, schema,
filesystem, partitioning, file_options, use_threads,
+ max_partitions
)
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index 73803c0..362967d 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -213,6 +213,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
shared_ptr[CFileSystem] filesystem
c_string base_dir
shared_ptr[CPartitioning] partitioning
+ int max_partitions
c_string basename_template
cdef cppclass CFileSystemDataset \
@@ -277,7 +278,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
cdef cppclass CDirectoryPartitioning \
"arrow::dataset::DirectoryPartitioning"(CPartitioning):
- CDirectoryPartitioning(shared_ptr[CSchema] schema)
+ CDirectoryPartitioning(shared_ptr[CSchema] schema,
+ vector[shared_ptr[CArray]] dictionaries)
@staticmethod
shared_ptr[CPartitioningFactory] MakeFactory(
@@ -285,7 +287,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
cdef cppclass CHivePartitioning \
"arrow::dataset::HivePartitioning"(CPartitioning):
- CHivePartitioning(shared_ptr[CSchema] schema)
+ CHivePartitioning(shared_ptr[CSchema] schema,
+ vector[shared_ptr[CArray]] dictionaries)
@staticmethod
shared_ptr[CPartitioningFactory] MakeFactory(
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 0ab9d95..c3ad83b 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -2317,6 +2317,32 @@ def test_write_dataset_partitioned(tempdir):
@pytest.mark.parquet
@pytest.mark.pandas
+def test_write_dataset_partitioned_dict(tempdir):
+ directory = tempdir / "partitioned"
+ _ = _create_parquet_dataset_partitioned(directory)
+
+ # directory partitioning, dictionary partition columns
+ dataset = ds.dataset(
+ directory,
+ partitioning=ds.HivePartitioning.discover(infer_dictionary=True))
+ target = tempdir / 'partitioned-dir-target'
+ expected_paths = [
+ target / "a", target / "a" / "part-0.feather",
+ target / "b", target / "b" / "part-1.feather"
+ ]
+ partitioning = ds.partitioning(pa.schema([
+ dataset.schema.field('part')]),
+ dictionaries={'part': pa.array(['a', 'b'])})
+ # NB: dictionaries required here since we use partitioning to parse
+ # directories in _check_dataset_roundtrip (not currently required for
+ # the formatting step)
+ _check_dataset_roundtrip(
+ dataset, str(target), expected_paths, target,
+ partitioning=partitioning)
+
+
+@pytest.mark.parquet
+@pytest.mark.pandas
def test_write_dataset_use_threads(tempdir):
directory = tempdir / "partitioned"
_ = _create_parquet_dataset_partitioned(directory)
@@ -2412,6 +2438,30 @@ def test_write_table_multiple_fragments(tempdir):
)
+def test_write_table_partitioned_dict(tempdir):
+ # ensure writing table partitioned on a dictionary column works without
+ # specifying the dictionary values explicitly
+ table = pa.table([
+ pa.array(range(20)),
+ pa.array(np.repeat(['a', 'b'], 10)).dictionary_encode(),
+ ], names=['col', 'part'])
+
+ partitioning = ds.partitioning(table.select(["part"]).schema)
+
+ base_dir = tempdir / "dataset"
+ ds.write_dataset(
+ table, base_dir, format="feather", partitioning=partitioning
+ )
+
+ # check roundtrip
+ partitioning_read = ds.DirectoryPartitioning.discover(
+ ["part"], infer_dictionary=True)
+ result = ds.dataset(
+ base_dir, format="ipc", partitioning=partitioning_read
+ ).to_table()
+ assert result.equals(table)
+
+
@pytest.mark.parquet
def test_write_dataset_parquet(tempdir):
import pyarrow.parquet as pq
diff --git a/testing b/testing
index b4eeafd..d6c4deb 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit b4eeafdec6fb5284c4aaf269f2ebdb3be2c63ed5
+Subproject commit d6c4deb22c4b4e9e3247a2f291046e3c671ad235