You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/04 16:06:05 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #8984: ARROW-5336: [C++] Implement arrow::Concatenate for dictionary-encoded arrays with unequal dictionaries

pitrou commented on a change in pull request #8984:
URL: https://github.com/apache/arrow/pull/8984#discussion_r551396579



##########
File path: cpp/src/arrow/array/concatenate_test.cc
##########
@@ -225,6 +225,86 @@ TEST_F(ConcatenateTest, DictionaryType) {
   });
 }
 
+TEST_F(ConcatenateTest, DictionaryTypeDifferentDictionaries) {
+  auto dict_type = dictionary(uint8(), utf8());
+  auto dict_one = DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0]",
+                                    "[\"A0\", \"A1\", \"A2\", \"A3\"]");
+  auto dict_two = DictArrayFromJSON(dict_type, "[null, 4, 2, 1]",
+                                    "[\"B0\", \"B1\", \"B2\", \"B3\", \"B4\"]");
+  auto concat_expected = DictArrayFromJSON(
+      dict_type, "[1, 2, null, 3, 0, null, 8, 6, 5]",
+      "[\"A0\", \"A1\", \"A2\", \"A3\", \"B0\", \"B1\", \"B2\", \"B3\", \"B4\"]");
+  ASSERT_OK_AND_ASSIGN(auto concat_actual, Concatenate({dict_one, dict_two}));
+  AssertArraysEqual(*concat_expected, *concat_actual);
+}
+
+TEST_F(ConcatenateTest, DictionaryTypePartialOverlapDictionaries) {
+  auto dict_type = dictionary(uint8(), utf8());
+  auto dict_one = DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0]",
+                                    "[\"A0\", \"A1\", \"C2\", \"C3\"]");
+  auto dict_two = DictArrayFromJSON(dict_type, "[null, 4, 2, 1]",
+                                    "[\"B0\", \"B1\", \"C2\", \"C3\", \"B4\"]");
+  auto concat_expected =
+      DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0, null, 6, 2, 5]",
+                        "[\"A0\", \"A1\", \"C2\", \"C3\", \"B0\", \"B1\", \"B4\"]");
+  ASSERT_OK_AND_ASSIGN(auto concat_actual, Concatenate({dict_one, dict_two}));
+  AssertArraysEqual(*concat_expected, *concat_actual);
+}
+
+TEST_F(ConcatenateTest, DictionaryTypeDifferentSizeIndex) {
+  auto dict_type = dictionary(uint8(), utf8());
+  auto bigger_dict_type = dictionary(uint16(), utf8());
+  auto dict_one = DictArrayFromJSON(dict_type, "[0]", "[\"A0\"]");
+  auto dict_two = DictArrayFromJSON(bigger_dict_type, "[0]", "[\"B0\"]");
+  ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status());
+}
+
+TEST_F(ConcatenateTest, DictionaryTypeCantUnifyNullInDictionary) {
+  auto dict_type = dictionary(uint8(), utf8());
+  auto dict_one = DictArrayFromJSON(dict_type, "[0, 1]", "[null, \"A\"]");
+  auto dict_two = DictArrayFromJSON(dict_type, "[0, 1]", "[null, \"B\"]");
+  ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status());
+}
+
+TEST_F(ConcatenateTest, DictionaryTypeEnlargedIndices) {
+  auto size = std::numeric_limits<uint8_t>::max() + 1;
+  auto dict_type = dictionary(uint8(), uint16());
+
+  UInt8Builder index_builder;
+  ASSERT_OK(index_builder.Reserve(size));
+  for (auto i = 0; i < size; i++) {
+    index_builder.UnsafeAppend(i);
+  }
+  ASSERT_OK_AND_ASSIGN(auto indices, index_builder.Finish());
+
+  UInt16Builder values_builder;
+  ASSERT_OK(values_builder.Reserve(size));
+  for (auto i = 0; i < size; i++) {
+    values_builder.UnsafeAppend(i);
+  }
+  ASSERT_OK_AND_ASSIGN(auto dictionary_one, values_builder.Finish());
+
+  UInt16Builder values_builder_two;
+  ASSERT_OK(values_builder_two.Reserve(size));
+  for (auto i = size; i < 2 * size; i++) {

Review comment:
       Did you mean `i += 2`? Or simply `UnsafeAppend(i * 2)`?

##########
File path: cpp/src/arrow/array/array_dict.cc
##########
@@ -44,6 +44,29 @@ namespace arrow {
 using internal::checked_cast;
 using internal::CopyBitmap;
 
+struct MaxIndexAccessor {
+  MaxIndexAccessor() {}
+
+  template <typename T>
+  enable_if_t<!is_integer_type<T>::value, Status> Visit(const T&) {
+    return Status::Invalid("Dictionary index types must be integer types");
+  }
+
+  template <typename T>
+  enable_if_integer<T, Status> Visit(const T&) {
+    max_index_value_ = std::numeric_limits<typename T::c_type>::max();
+    return Status::OK();
+  }
+
+  int64_t max_index_value_ = 0;
+};
+
+Result<int64_t> DictionaryIndexMaxValue(std::shared_ptr<DataType> index_type) {

Review comment:
       Returning `int64_t` is not right if the index type is `uint64_t`.

##########
File path: cpp/src/arrow/array/concatenate.cc
##########
@@ -274,7 +329,12 @@ class ConcatenateImpl {
       ARROW_ASSIGN_OR_RAISE(auto index_buffers, Buffers(1, *fixed));
       return ConcatenateBuffers(index_buffers, pool_).Value(&out_->buffers[1]);
     } else {
-      return Status::NotImplemented("Concat with dictionary unification NYI");
+      ARROW_ASSIGN_OR_RAISE(auto index_buffers, Buffers(1, *fixed));

Review comment:
       This seems common to both `if` branches.

##########
File path: cpp/src/arrow/array/concatenate.cc
##########
@@ -163,6 +163,46 @@ static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset
   return Status::OK();
 }
 
+struct DictionaryConcatenate {
+  DictionaryConcatenate(BufferVector& index_buffers,
+                        std::vector<std::shared_ptr<Buffer>>& index_lookup,
+                        MemoryPool* pool)
+      : out_(nullptr),
+        index_buffers_(index_buffers),
+        index_lookup_(index_lookup),
+        pool_(pool) {}
+
+  template <typename T>
+  enable_if_t<!is_integer_type<T>::value, Status> Visit(const T& t) {
+    return Status::Invalid("Dictionary indices must be integral types");
+  }
+
+  template <typename T, typename CType = typename T::c_type>
+  enable_if_integer<T, Status> Visit(const T& index_type) {
+    int64_t out_length = 0;
+    for (const auto& buffer : index_buffers_) {
+      out_length += buffer->size();
+    }
+    ARROW_ASSIGN_OR_RAISE(out_, AllocateBuffer(out_length, pool_));
+    auto out_data = out_->mutable_data();

Review comment:
       If `out_data` a `CType*`? If so, spell it out explicitly for clarity. (is a `reinterpret_cast` missing too?)

##########
File path: cpp/src/arrow/type.h
##########
@@ -1367,6 +1367,12 @@ class ARROW_EXPORT DictionaryUnifier {
   /// after this is called
   virtual Status GetResult(std::shared_ptr<DataType>* out_type,
                            std::shared_ptr<Array>* out_dict) = 0;
+
+  /// \brief Return a result DictionaryType with the given index type.  If
+  /// the index type is not large enough then an invalid status will be returned.
+  /// The unifier cannot be used after this is called
+  virtual Status GetResultWithIndexType(std::shared_ptr<DataType> index_type,

Review comment:
       Isn't the docstring a bit off? It doesn't seem a `DictionaryType` is returned.

##########
File path: cpp/src/arrow/array/concatenate.cc
##########
@@ -163,6 +163,46 @@ static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset
   return Status::OK();
 }
 
+struct DictionaryConcatenate {
+  DictionaryConcatenate(BufferVector& index_buffers,
+                        std::vector<std::shared_ptr<Buffer>>& index_lookup,
+                        MemoryPool* pool)
+      : out_(nullptr),
+        index_buffers_(index_buffers),
+        index_lookup_(index_lookup),
+        pool_(pool) {}
+
+  template <typename T>
+  enable_if_t<!is_integer_type<T>::value, Status> Visit(const T& t) {
+    return Status::Invalid("Dictionary indices must be integral types");
+  }
+
+  template <typename T, typename CType = typename T::c_type>
+  enable_if_integer<T, Status> Visit(const T& index_type) {
+    int64_t out_length = 0;
+    for (const auto& buffer : index_buffers_) {
+      out_length += buffer->size();
+    }
+    ARROW_ASSIGN_OR_RAISE(out_, AllocateBuffer(out_length, pool_));
+    auto out_data = out_->mutable_data();
+    for (size_t i = 0; i < index_buffers_.size(); i++) {
+      auto buffer = index_buffers_[i];
+      auto old_indices = reinterpret_cast<const CType*>(buffer->data());
+      auto indices_map = reinterpret_cast<int32_t*>(index_lookup_[i]->mutable_data());

Review comment:
       Why `mutable_data`? A `const` pointer should be sufficient here.

##########
File path: cpp/src/arrow/array/concatenate.cc
##########
@@ -163,6 +163,46 @@ static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset
   return Status::OK();
 }
 
+struct DictionaryConcatenate {
+  DictionaryConcatenate(BufferVector& index_buffers,
+                        std::vector<std::shared_ptr<Buffer>>& index_lookup,
+                        MemoryPool* pool)
+      : out_(nullptr),
+        index_buffers_(index_buffers),
+        index_lookup_(index_lookup),
+        pool_(pool) {}
+
+  template <typename T>
+  enable_if_t<!is_integer_type<T>::value, Status> Visit(const T& t) {
+    return Status::Invalid("Dictionary indices must be integral types");
+  }
+
+  template <typename T, typename CType = typename T::c_type>
+  enable_if_integer<T, Status> Visit(const T& index_type) {
+    int64_t out_length = 0;
+    for (const auto& buffer : index_buffers_) {
+      out_length += buffer->size();
+    }
+    ARROW_ASSIGN_OR_RAISE(out_, AllocateBuffer(out_length, pool_));
+    auto out_data = out_->mutable_data();
+    for (size_t i = 0; i < index_buffers_.size(); i++) {
+      auto buffer = index_buffers_[i];
+      auto old_indices = reinterpret_cast<const CType*>(buffer->data());
+      auto indices_map = reinterpret_cast<int32_t*>(index_lookup_[i]->mutable_data());
+      for (int64_t j = 0; j < buffer->size(); j++) {
+        out_data[j] = indices_map[old_indices[j]];
+      }
+      out_data += buffer->size();
+    }
+    return Status::OK();
+  }
+
+  std::shared_ptr<Buffer> out_;
+  BufferVector& index_buffers_;

Review comment:
       Our [coding guidelines](https://arrow.apache.org/docs/developers/cpp/development.html#code-style-linting-and-ci) (mostly the Google C++ style guide) strongly recommend const-references.

##########
File path: cpp/src/arrow/array/concatenate.cc
##########
@@ -163,6 +163,46 @@ static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset
   return Status::OK();
 }
 
+struct DictionaryConcatenate {
+  DictionaryConcatenate(BufferVector& index_buffers,
+                        std::vector<std::shared_ptr<Buffer>>& index_lookup,
+                        MemoryPool* pool)
+      : out_(nullptr),
+        index_buffers_(index_buffers),
+        index_lookup_(index_lookup),
+        pool_(pool) {}
+
+  template <typename T>
+  enable_if_t<!is_integer_type<T>::value, Status> Visit(const T& t) {
+    return Status::Invalid("Dictionary indices must be integral types");
+  }
+
+  template <typename T, typename CType = typename T::c_type>
+  enable_if_integer<T, Status> Visit(const T& index_type) {
+    int64_t out_length = 0;
+    for (const auto& buffer : index_buffers_) {
+      out_length += buffer->size();
+    }
+    ARROW_ASSIGN_OR_RAISE(out_, AllocateBuffer(out_length, pool_));
+    auto out_data = out_->mutable_data();
+    for (size_t i = 0; i < index_buffers_.size(); i++) {
+      auto buffer = index_buffers_[i];
+      auto old_indices = reinterpret_cast<const CType*>(buffer->data());
+      auto indices_map = reinterpret_cast<int32_t*>(index_lookup_[i]->mutable_data());
+      for (int64_t j = 0; j < buffer->size(); j++) {
+        out_data[j] = indices_map[old_indices[j]];

Review comment:
       There are transposition utilities in `util/int_util.h`. They may be more optimized than this scalar loop.

##########
File path: cpp/src/arrow/array/concatenate.cc
##########
@@ -163,6 +163,46 @@ static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset
   return Status::OK();
 }
 
+struct DictionaryConcatenate {
+  DictionaryConcatenate(BufferVector& index_buffers,
+                        std::vector<std::shared_ptr<Buffer>>& index_lookup,
+                        MemoryPool* pool)
+      : out_(nullptr),
+        index_buffers_(index_buffers),
+        index_lookup_(index_lookup),
+        pool_(pool) {}
+
+  template <typename T>
+  enable_if_t<!is_integer_type<T>::value, Status> Visit(const T& t) {
+    return Status::Invalid("Dictionary indices must be integral types");
+  }
+
+  template <typename T, typename CType = typename T::c_type>
+  enable_if_integer<T, Status> Visit(const T& index_type) {
+    int64_t out_length = 0;
+    for (const auto& buffer : index_buffers_) {
+      out_length += buffer->size();
+    }
+    ARROW_ASSIGN_OR_RAISE(out_, AllocateBuffer(out_length, pool_));
+    auto out_data = out_->mutable_data();
+    for (size_t i = 0; i < index_buffers_.size(); i++) {
+      auto buffer = index_buffers_[i];

Review comment:
       Nit: `const auto&`? Though copying the `shared_ptr` is probably not a bottleneck here...

##########
File path: cpp/src/arrow/array/concatenate.cc
##########
@@ -163,6 +163,46 @@ static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset
   return Status::OK();
 }
 
+struct DictionaryConcatenate {
+  DictionaryConcatenate(BufferVector& index_buffers,
+                        std::vector<std::shared_ptr<Buffer>>& index_lookup,
+                        MemoryPool* pool)
+      : out_(nullptr),
+        index_buffers_(index_buffers),
+        index_lookup_(index_lookup),
+        pool_(pool) {}
+
+  template <typename T>
+  enable_if_t<!is_integer_type<T>::value, Status> Visit(const T& t) {
+    return Status::Invalid("Dictionary indices must be integral types");
+  }
+
+  template <typename T, typename CType = typename T::c_type>
+  enable_if_integer<T, Status> Visit(const T& index_type) {
+    int64_t out_length = 0;
+    for (const auto& buffer : index_buffers_) {
+      out_length += buffer->size();
+    }
+    ARROW_ASSIGN_OR_RAISE(out_, AllocateBuffer(out_length, pool_));
+    auto out_data = out_->mutable_data();
+    for (size_t i = 0; i < index_buffers_.size(); i++) {
+      auto buffer = index_buffers_[i];
+      auto old_indices = reinterpret_cast<const CType*>(buffer->data());
+      auto indices_map = reinterpret_cast<int32_t*>(index_lookup_[i]->mutable_data());
+      for (int64_t j = 0; j < buffer->size(); j++) {
+        out_data[j] = indices_map[old_indices[j]];
+      }
+      out_data += buffer->size();
+    }
+    return Status::OK();
+  }
+
+  std::shared_ptr<Buffer> out_;
+  BufferVector& index_buffers_;
+  std::vector<std::shared_ptr<Buffer>>& index_lookup_;

Review comment:
       Also const-ref. And why not use `BufferVector` here too?

##########
File path: cpp/src/arrow/array/concatenate.cc
##########
@@ -163,6 +163,46 @@ static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset
   return Status::OK();
 }
 
+struct DictionaryConcatenate {

Review comment:
       While we are at it, can all non-public functions/classes in this module be put in the anonymous namespace? This reduces the number of exported symbols and can also open more optimization opportunities for the compiler.

##########
File path: cpp/src/arrow/array/concatenate_test.cc
##########
@@ -225,6 +225,86 @@ TEST_F(ConcatenateTest, DictionaryType) {
   });
 }
 
+TEST_F(ConcatenateTest, DictionaryTypeDifferentDictionaries) {
+  auto dict_type = dictionary(uint8(), utf8());

Review comment:
       Hmm... can you test with index types larger than 1 byte? There may be an issue with them in the current impl (not sure).

##########
File path: cpp/src/arrow/array/array_dict.cc
##########
@@ -44,6 +44,29 @@ namespace arrow {
 using internal::checked_cast;
 using internal::CopyBitmap;
 
+struct MaxIndexAccessor {

Review comment:
       Which numeric type would you return for the min/max of all numeric types? Let's just stick with integers :-)
   As for where to put it, `util/int_util.h` sounds like a decent place.

##########
File path: cpp/src/arrow/array/concatenate.cc
##########
@@ -163,6 +163,46 @@ static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset
   return Status::OK();
 }
 
+struct DictionaryConcatenate {
+  DictionaryConcatenate(BufferVector& index_buffers,
+                        std::vector<std::shared_ptr<Buffer>>& index_lookup,
+                        MemoryPool* pool)
+      : out_(nullptr),
+        index_buffers_(index_buffers),
+        index_lookup_(index_lookup),
+        pool_(pool) {}
+
+  template <typename T>
+  enable_if_t<!is_integer_type<T>::value, Status> Visit(const T& t) {
+    return Status::Invalid("Dictionary indices must be integral types");
+  }
+
+  template <typename T, typename CType = typename T::c_type>
+  enable_if_integer<T, Status> Visit(const T& index_type) {
+    int64_t out_length = 0;
+    for (const auto& buffer : index_buffers_) {
+      out_length += buffer->size();
+    }
+    ARROW_ASSIGN_OR_RAISE(out_, AllocateBuffer(out_length, pool_));
+    auto out_data = out_->mutable_data();
+    for (size_t i = 0; i < index_buffers_.size(); i++) {

Review comment:
       Doing this is more readable than manually incrementing a pair of iterators in lockstep, IMHO.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org