You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2021/01/11 08:51:07 UTC

[arrow] branch master updated: ARROW-5336: [C++] Implement arrow::Concatenate for dictionary-encoded arrays with unequal dictionaries

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 97f8160  ARROW-5336: [C++] Implement arrow::Concatenate for dictionary-encoded arrays with unequal dictionaries
97f8160 is described below

commit 97f8160e56036909cdf554860baa341c46d3caf3
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon Jan 11 09:49:48 2021 +0100

    ARROW-5336: [C++] Implement arrow::Concatenate for dictionary-encoded arrays with unequal dictionaries
    
    The dictionaries still need to have the same index & value types.  It is possible that concatenating two dictionaries still fails because the resulting dictionary has more values than its index type can represent.
    
    The unification will still fail if nulls are present in either dictionary.  The canonical approach seems to be representing nulls in the indices array with a validity bitmap.  The existing unifier had this constraint in place.  My guess is that this was to avoid making the memo table null-aware.  It could be handled without modification to the memo table by using a -1 index and so I could easily add this if desired.  I wasn't sure if support for this non-typical case justified the com [...]
    
    Closes #8984 from westonpace/feature/arrow-5336
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/array/array_dict.cc       |  18 +++++
 cpp/src/arrow/array/concatenate.cc      |  83 ++++++++++++++++++---
 cpp/src/arrow/array/concatenate_test.cc | 127 ++++++++++++++++++++++++++++++++
 cpp/src/arrow/type.h                    |   6 ++
 4 files changed, 223 insertions(+), 11 deletions(-)

diff --git a/cpp/src/arrow/array/array_dict.cc b/cpp/src/arrow/array/array_dict.cc
index 614c81d..dc9c23c 100644
--- a/cpp/src/arrow/array/array_dict.cc
+++ b/cpp/src/arrow/array/array_dict.cc
@@ -29,6 +29,7 @@
 #include "arrow/array/dict_internal.h"
 #include "arrow/array/util.h"
 #include "arrow/buffer.h"
+#include "arrow/datum.h"
 #include "arrow/status.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
@@ -205,6 +206,23 @@ class DictionaryUnifierImpl : public DictionaryUnifier {
     return Status::OK();
   }
 
+  Status GetResultWithIndexType(std::shared_ptr<DataType> index_type,
+                                std::shared_ptr<Array>* out_dict) override {
+    int64_t dict_length = memo_table_.size();
+    if (!internal::IntegersCanFit(Datum(dict_length), *index_type).ok()) {
+      return Status::Invalid(
+          "These dictionaries cannot be combined.  The unified dictionary requires a "
+          "larger index type.");
+    }
+
+    // Build unified dictionary array
+    std::shared_ptr<ArrayData> data;
+    RETURN_NOT_OK(DictTraits::GetDictionaryArrayData(pool_, value_type_, memo_table_,
+                                                     0 /* start_offset */, &data));
+    *out_dict = MakeArray(data);
+    return Status::OK();
+  }
+
  private:
   MemoryPool* pool_;
   std::shared_ptr<DataType> value_type_;
diff --git a/cpp/src/arrow/array/concatenate.cc b/cpp/src/arrow/array/concatenate.cc
index a22b28b..1aa33c0 100644
--- a/cpp/src/arrow/array/concatenate.cc
+++ b/cpp/src/arrow/array/concatenate.cc
@@ -36,6 +36,7 @@
 #include "arrow/util/bit_util.h"
 #include "arrow/util/bitmap_ops.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util.h"
 #include "arrow/util/int_util_internal.h"
 #include "arrow/util/logging.h"
 #include "arrow/visitor_inline.h"
@@ -44,6 +45,7 @@ namespace arrow {
 
 using internal::SafeSignedAdd;
 
+namespace {
 /// offset, length pair for representing a Range of a buffer or array
 struct Range {
   int64_t offset = -1, length = 0;
@@ -66,8 +68,8 @@ struct Bitmap {
 };
 
 // Allocate a buffer and concatenate bitmaps into it.
-static Status ConcatenateBitmaps(const std::vector<Bitmap>& bitmaps, MemoryPool* pool,
-                                 std::shared_ptr<Buffer>* out) {
+Status ConcatenateBitmaps(const std::vector<Bitmap>& bitmaps, MemoryPool* pool,
+                          std::shared_ptr<Buffer>* out) {
   int64_t out_length = 0;
   for (const auto& bitmap : bitmaps) {
     if (internal::AddWithOverflow(out_length, bitmap.range.length, &out_length)) {
@@ -94,15 +96,15 @@ static Status ConcatenateBitmaps(const std::vector<Bitmap>& bitmaps, MemoryPool*
 // Write offsets in src into dst, adjusting them such that first_offset
 // will be the first offset written.
 template <typename Offset>
-static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset,
-                         Offset* dst, Range* values_range);
+Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset, Offset* dst,
+                  Range* values_range);
 
 // Concatenate buffers holding offsets into a single buffer of offsets,
 // also computing the ranges of values spanned by each buffer of offsets.
 template <typename Offset>
-static Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool,
-                                 std::shared_ptr<Buffer>* out,
-                                 std::vector<Range>* values_ranges) {
+Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool,
+                          std::shared_ptr<Buffer>* out,
+                          std::vector<Range>* values_ranges) {
   values_ranges->resize(buffers.size());
 
   // allocate output buffer
@@ -130,8 +132,8 @@ static Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool,
 }
 
 template <typename Offset>
-static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset,
-                         Offset* dst, Range* values_range) {
+Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset, Offset* dst,
+                  Range* values_range) {
   if (src->size() == 0) {
     // It's allowed to have an empty offsets buffer for a 0-length array
     // (see Array::Validate)
@@ -163,6 +165,44 @@ static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset
   return Status::OK();
 }
 
+struct DictionaryConcatenate {
+  DictionaryConcatenate(BufferVector& index_buffers, BufferVector& index_lookup,
+                        MemoryPool* pool)
+      : out_(nullptr),
+        index_buffers_(index_buffers),
+        index_lookup_(index_lookup),
+        pool_(pool) {}
+
+  template <typename T>
+  enable_if_t<!is_integer_type<T>::value, Status> Visit(const T& t) {
+    return Status::Invalid("Dictionary indices must be integral types");
+  }
+
+  template <typename T, typename CType = typename T::c_type>
+  enable_if_integer<T, Status> Visit(const T& index_type) {
+    int64_t out_length = 0;
+    for (const auto& buffer : index_buffers_) {
+      out_length += buffer->size();
+    }
+    ARROW_ASSIGN_OR_RAISE(out_, AllocateBuffer(out_length, pool_));
+    CType* out_data = reinterpret_cast<CType*>(out_->mutable_data());
+    for (size_t i = 0; i < index_buffers_.size(); i++) {
+      const auto& buffer = index_buffers_[i];
+      auto size = buffer->size() / sizeof(CType);
+      auto old_indices = reinterpret_cast<const CType*>(buffer->data());
+      auto indices_map = reinterpret_cast<const int32_t*>(index_lookup_[i]->data());
+      internal::TransposeInts(old_indices, out_data, size, indices_map);
+      out_data += size;
+    }
+    return Status::OK();
+  }
+
+  std::shared_ptr<Buffer> out_;
+  const BufferVector& index_buffers_;
+  const BufferVector& index_lookup_;
+  MemoryPool* pool_;
+};
+
 class ConcatenateImpl {
  public:
   ConcatenateImpl(const std::vector<std::shared_ptr<const ArrayData>>& in,
@@ -255,6 +295,21 @@ class ConcatenateImpl {
     return Status::OK();
   }
 
+  Result<BufferVector> UnifyDictionaries(const DictionaryType& d) {
+    BufferVector new_index_lookup;
+    ARROW_ASSIGN_OR_RAISE(auto unifier, DictionaryUnifier::Make(d.value_type()));
+    new_index_lookup.resize(in_.size());
+    for (size_t i = 0; i < in_.size(); i++) {
+      auto item = in_[i];
+      auto dictionary_array = MakeArray(item->dictionary);
+      RETURN_NOT_OK(unifier->Unify(*dictionary_array, &new_index_lookup[i]));
+    }
+    std::shared_ptr<Array> out_dictionary;
+    RETURN_NOT_OK(unifier->GetResultWithIndexType(d.index_type(), &out_dictionary));
+    out_->dictionary = out_dictionary->data();
+    return new_index_lookup;
+  }
+
   Status Visit(const DictionaryType& d) {
     auto fixed = internal::checked_cast<const FixedWidthType*>(d.index_type().get());
 
@@ -269,12 +324,16 @@ class ConcatenateImpl {
       }
     }
 
+    ARROW_ASSIGN_OR_RAISE(auto index_buffers, Buffers(1, *fixed));
     if (dictionaries_same) {
       out_->dictionary = in_[0]->dictionary;
-      ARROW_ASSIGN_OR_RAISE(auto index_buffers, Buffers(1, *fixed));
       return ConcatenateBuffers(index_buffers, pool_).Value(&out_->buffers[1]);
     } else {
-      return Status::NotImplemented("Concat with dictionary unification NYI");
+      ARROW_ASSIGN_OR_RAISE(auto index_lookup, UnifyDictionaries(d));
+      DictionaryConcatenate concatenate(index_buffers, index_lookup, pool_);
+      RETURN_NOT_OK(VisitTypeInline(*d.index_type(), &concatenate));
+      out_->buffers[1] = std::move(concatenate.out_);
+      return Status::OK();
     }
   }
 
@@ -416,6 +475,8 @@ class ConcatenateImpl {
   std::shared_ptr<ArrayData> out_;
 };
 
+}  // namespace
+
 Result<std::shared_ptr<Array>> Concatenate(const ArrayVector& arrays, MemoryPool* pool) {
   if (arrays.size() == 0) {
     return Status::Invalid("Must pass at least one array");
diff --git a/cpp/src/arrow/array/concatenate_test.cc b/cpp/src/arrow/array/concatenate_test.cc
index 4289678..c6e6e5b 100644
--- a/cpp/src/arrow/array/concatenate_test.cc
+++ b/cpp/src/arrow/array/concatenate_test.cc
@@ -225,6 +225,133 @@ TEST_F(ConcatenateTest, DictionaryType) {
   });
 }
 
+TEST_F(ConcatenateTest, DictionaryTypeDifferentDictionaries) {
+  {
+    auto dict_type = dictionary(uint8(), utf8());
+    auto dict_one = DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0]",
+                                      "[\"A0\", \"A1\", \"A2\", \"A3\"]");
+    auto dict_two = DictArrayFromJSON(dict_type, "[null, 4, 2, 1]",
+                                      "[\"B0\", \"B1\", \"B2\", \"B3\", \"B4\"]");
+    auto concat_expected = DictArrayFromJSON(
+        dict_type, "[1, 2, null, 3, 0, null, 8, 6, 5]",
+        "[\"A0\", \"A1\", \"A2\", \"A3\", \"B0\", \"B1\", \"B2\", \"B3\", \"B4\"]");
+    ASSERT_OK_AND_ASSIGN(auto concat_actual, Concatenate({dict_one, dict_two}));
+    AssertArraysEqual(*concat_expected, *concat_actual);
+  }
+  {
+    const int SIZE = 500;
+    auto dict_type = dictionary(uint16(), utf8());
+
+    UInt16Builder index_builder;
+    UInt16Builder expected_index_builder;
+    ASSERT_OK(index_builder.Reserve(SIZE));
+    ASSERT_OK(expected_index_builder.Reserve(SIZE * 2));
+    for (auto i = 0; i < SIZE; i++) {
+      index_builder.UnsafeAppend(i);
+      expected_index_builder.UnsafeAppend(i);
+    }
+    for (auto i = SIZE; i < 2 * SIZE; i++) {
+      expected_index_builder.UnsafeAppend(i);
+    }
+    ASSERT_OK_AND_ASSIGN(auto indices, index_builder.Finish());
+    ASSERT_OK_AND_ASSIGN(auto expected_indices, expected_index_builder.Finish());
+
+    // Creates three dictionaries.  The first maps i->"{i}" the second maps i->"{500+i}",
+    // each for 500 values and the third maps i->"{i}" but for 1000 values.
+    // The first and second concatenated should end up equaling the third.  All strings
+    // are padded to length 8 so we can know the size ahead of time.
+    StringBuilder values_one_builder;
+    StringBuilder values_two_builder;
+    ASSERT_OK(values_one_builder.Resize(SIZE));
+    ASSERT_OK(values_two_builder.Resize(SIZE));
+    ASSERT_OK(values_one_builder.ReserveData(8 * SIZE));
+    ASSERT_OK(values_two_builder.ReserveData(8 * SIZE));
+    for (auto i = 0; i < SIZE; i++) {
+      auto i_str = std::to_string(i);
+      auto padded = i_str.insert(0, 8 - i_str.length(), '0');
+      values_one_builder.UnsafeAppend(padded);
+      auto upper_i_str = std::to_string(i + SIZE);
+      auto upper_padded = upper_i_str.insert(0, 8 - i_str.length(), '0');
+      values_two_builder.UnsafeAppend(upper_padded);
+    }
+    ASSERT_OK_AND_ASSIGN(auto dictionary_one, values_one_builder.Finish());
+    ASSERT_OK_AND_ASSIGN(auto dictionary_two, values_two_builder.Finish());
+    ASSERT_OK_AND_ASSIGN(auto expected_dictionary,
+                         Concatenate({dictionary_one, dictionary_two}))
+
+    auto one = std::make_shared<DictionaryArray>(dict_type, indices, dictionary_one);
+    auto two = std::make_shared<DictionaryArray>(dict_type, indices, dictionary_two);
+    auto expected = std::make_shared<DictionaryArray>(dict_type, expected_indices,
+                                                      expected_dictionary);
+    ASSERT_OK_AND_ASSIGN(auto combined, Concatenate({one, two}));
+    AssertArraysEqual(*combined, *expected);
+  }
+}
+
+TEST_F(ConcatenateTest, DictionaryTypePartialOverlapDictionaries) {
+  auto dict_type = dictionary(uint8(), utf8());
+  auto dict_one = DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0]",
+                                    "[\"A0\", \"A1\", \"C2\", \"C3\"]");
+  auto dict_two = DictArrayFromJSON(dict_type, "[null, 4, 2, 1]",
+                                    "[\"B0\", \"B1\", \"C2\", \"C3\", \"B4\"]");
+  auto concat_expected =
+      DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0, null, 6, 2, 5]",
+                        "[\"A0\", \"A1\", \"C2\", \"C3\", \"B0\", \"B1\", \"B4\"]");
+  ASSERT_OK_AND_ASSIGN(auto concat_actual, Concatenate({dict_one, dict_two}));
+  AssertArraysEqual(*concat_expected, *concat_actual);
+}
+
+TEST_F(ConcatenateTest, DictionaryTypeDifferentSizeIndex) {
+  auto dict_type = dictionary(uint8(), utf8());
+  auto bigger_dict_type = dictionary(uint16(), utf8());
+  auto dict_one = DictArrayFromJSON(dict_type, "[0]", "[\"A0\"]");
+  auto dict_two = DictArrayFromJSON(bigger_dict_type, "[0]", "[\"B0\"]");
+  ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status());
+}
+
+TEST_F(ConcatenateTest, DictionaryTypeCantUnifyNullInDictionary) {
+  auto dict_type = dictionary(uint8(), utf8());
+  auto dict_one = DictArrayFromJSON(dict_type, "[0, 1]", "[null, \"A\"]");
+  auto dict_two = DictArrayFromJSON(dict_type, "[0, 1]", "[null, \"B\"]");
+  ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status());
+}
+
+TEST_F(ConcatenateTest, DictionaryTypeEnlargedIndices) {
+  auto size = std::numeric_limits<uint8_t>::max() + 1;
+  auto dict_type = dictionary(uint8(), uint16());
+
+  UInt8Builder index_builder;
+  ASSERT_OK(index_builder.Reserve(size));
+  for (auto i = 0; i < size; i++) {
+    index_builder.UnsafeAppend(i);
+  }
+  ASSERT_OK_AND_ASSIGN(auto indices, index_builder.Finish());
+
+  UInt16Builder values_builder;
+  ASSERT_OK(values_builder.Reserve(size));
+  UInt16Builder values_builder_two;
+  ASSERT_OK(values_builder_two.Reserve(size));
+  for (auto i = 0; i < size; i++) {
+    values_builder.UnsafeAppend(i);
+    values_builder_two.UnsafeAppend(i + size);
+  }
+  ASSERT_OK_AND_ASSIGN(auto dictionary_one, values_builder.Finish());
+  ASSERT_OK_AND_ASSIGN(auto dictionary_two, values_builder_two.Finish());
+
+  auto dict_one = std::make_shared<DictionaryArray>(dict_type, indices, dictionary_one);
+  auto dict_two = std::make_shared<DictionaryArray>(dict_type, indices, dictionary_two);
+  ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status());
+
+  auto bigger_dict_type = dictionary(uint16(), uint16());
+
+  auto bigger_one =
+      std::make_shared<DictionaryArray>(bigger_dict_type, dictionary_one, dictionary_one);
+  auto bigger_two =
+      std::make_shared<DictionaryArray>(bigger_dict_type, dictionary_one, dictionary_two);
+  ASSERT_OK_AND_ASSIGN(auto combined, Concatenate({bigger_one, bigger_two}));
+  ASSERT_EQ(size * 2, combined->length());
+}
+
 TEST_F(ConcatenateTest, DISABLED_UnionType) {
   // sparse mode
   Check([this](int32_t size, double null_probability, std::shared_ptr<Array>* out) {
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index a1071c4..127ed59 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -1367,6 +1367,12 @@ class ARROW_EXPORT DictionaryUnifier {
   /// after this is called
   virtual Status GetResult(std::shared_ptr<DataType>* out_type,
                            std::shared_ptr<Array>* out_dict) = 0;
+
+  /// \brief Return a unified dictionary with the given index type.  If
+  /// the index type is not large enough then an invalid status will be returned.
+  /// The unifier cannot be used after this is called
+  virtual Status GetResultWithIndexType(std::shared_ptr<DataType> index_type,
+                                        std::shared_ptr<Array>* out_dict) = 0;
 };
 
 // ----------------------------------------------------------------------