You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/06/27 16:12:33 UTC

[arrow] branch master updated: ARROW-9132: [C++] Support Unique and ValueCounts on dictionary data with non-changing dictionaries, add ChunkedArray::Make validating constructor

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 50ba827  ARROW-9132: [C++] Support Unique and ValueCounts on dictionary data with non-changing dictionaries, add ChunkedArray::Make validating constructor
50ba827 is described below

commit 50ba827890ec9e380a0ae95ee11956443db5b84b
Author: Wes McKinney <we...@apache.org>
AuthorDate: Sat Jun 27 11:12:09 2020 -0500

    ARROW-9132: [C++] Support Unique and ValueCounts on dictionary data with non-changing dictionaries, add ChunkedArray::Make validating constructor
    
    This dispatches to the hash function for the indices while checking that the dictionaries stay the same on each processed chunk
    
    Closes #7551 from wesm/ARROW-9132
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/arrow/chunked_array.cc                    |  18 ++
 cpp/src/arrow/chunked_array.h                     |   4 +
 cpp/src/arrow/chunked_array_test.cc               |  19 ++
 cpp/src/arrow/compute/kernels/vector_hash.cc      | 226 ++++++++++++++++------
 cpp/src/arrow/compute/kernels/vector_hash_test.cc |  37 +++-
 python/pyarrow/tests/test_array.py                |  15 ++
 6 files changed, 261 insertions(+), 58 deletions(-)

diff --git a/cpp/src/arrow/chunked_array.cc b/cpp/src/arrow/chunked_array.cc
index 4ed4245..31b1378 100644
--- a/cpp/src/arrow/chunked_array.cc
+++ b/cpp/src/arrow/chunked_array.cc
@@ -64,6 +64,24 @@ ChunkedArray::ChunkedArray(ArrayVector chunks, std::shared_ptr<DataType> type)
   }
 }
 
+Result<std::shared_ptr<ChunkedArray>> ChunkedArray::Make(ArrayVector chunks,
+                                                         std::shared_ptr<DataType> type) {
+  if (type == nullptr) {
+    if (chunks.size() == 0) {
+      return Status::Invalid(
+          "cannot construct ChunkedArray from empty vector "
+          "and omitted type");
+    }
+    type = chunks[0]->type();
+  }
+  for (size_t i = 0; i < chunks.size(); ++i) {
+    if (!chunks[i]->type()->Equals(*type)) {
+      return Status::Invalid("Array chunks must all be same type");
+    }
+  }
+  return std::make_shared<ChunkedArray>(std::move(chunks), std::move(type));
+}
+
 bool ChunkedArray::Equals(const ChunkedArray& other) const {
   if (length_ != other.length()) {
     return false;
diff --git a/cpp/src/arrow/chunked_array.h b/cpp/src/arrow/chunked_array.h
index c16d66f..b1f8d6c 100644
--- a/cpp/src/arrow/chunked_array.h
+++ b/cpp/src/arrow/chunked_array.h
@@ -81,6 +81,10 @@ class ARROW_EXPORT ChunkedArray {
   /// As the data type is passed explicitly, the vector may be empty.
   ChunkedArray(ArrayVector chunks, std::shared_ptr<DataType> type);
 
+  // \brief Constructor with basic input validation.
+  static Result<std::shared_ptr<ChunkedArray>> Make(
+      ArrayVector chunks, std::shared_ptr<DataType> type = NULLPTR);
+
   /// \return the total length of the chunked array; computed on construction
   int64_t length() const { return length_; }
 
diff --git a/cpp/src/arrow/chunked_array_test.cc b/cpp/src/arrow/chunked_array_test.cc
index 5074ead..2e2107f 100644
--- a/cpp/src/arrow/chunked_array_test.cc
+++ b/cpp/src/arrow/chunked_array_test.cc
@@ -48,6 +48,25 @@ class TestChunkedArray : public TestBase {
   std::shared_ptr<ChunkedArray> another_;
 };
 
+TEST_F(TestChunkedArray, Make) {
+  ASSERT_RAISES(Invalid, ChunkedArray::Make({}));
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<ChunkedArray> result,
+                       ChunkedArray::Make({}, int64()));
+  AssertTypeEqual(*int64(), *result->type());
+  ASSERT_EQ(result->num_chunks(), 0);
+
+  auto chunk0 = ArrayFromJSON(int8(), "[0, 1, 2]");
+  auto chunk1 = ArrayFromJSON(int16(), "[3, 4, 5]");
+
+  ASSERT_OK_AND_ASSIGN(result, ChunkedArray::Make({chunk0, chunk0}));
+  ASSERT_OK_AND_ASSIGN(auto result2, ChunkedArray::Make({chunk0, chunk0}, int8()));
+  AssertChunkedEqual(*result, *result2);
+
+  ASSERT_RAISES(Invalid, ChunkedArray::Make({chunk0, chunk1}));
+  ASSERT_RAISES(Invalid, ChunkedArray::Make({chunk0}, int16()));
+}
+
 TEST_F(TestChunkedArray, BasicEquals) {
   std::vector<bool> null_bitmap(100, true);
   std::vector<int32_t> data(100, 1);
diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc
index 1cd677e..f8d1cff 100644
--- a/cpp/src/arrow/compute/kernels/vector_hash.cc
+++ b/cpp/src/arrow/compute/kernels/vector_hash.cc
@@ -28,6 +28,7 @@
 #include "arrow/compute/kernels/common.h"
 #include "arrow/result.h"
 #include "arrow/util/hashing.h"
+#include "arrow/util/make_unique.h"
 
 namespace arrow {
 
@@ -207,9 +208,7 @@ class HashKernel : public KernelState {
  public:
   // Reset for another run.
   virtual Status Reset() = 0;
-  // Prepare the Action for the given input (e.g. reserve appropriately sized
-  // data structures) and visit the given input with Action.
-  virtual Status Append(KernelContext* ctx, const ArrayData& input) = 0;
+
   // Flush out accumulated results from the last invocation of Call.
   virtual Status Flush(Datum* out) = 0;
   // Flush out accumulated results across all invocations of Call. The kernel
@@ -217,18 +216,16 @@ class HashKernel : public KernelState {
   virtual Status FlushFinal(Datum* out) = 0;
   // Get the values (keys) accumulated in the dictionary so far.
   virtual Status GetDictionary(std::shared_ptr<ArrayData>* out) = 0;
-};
 
-// ----------------------------------------------------------------------
-// Base class for all hash kernel implementations
+  virtual std::shared_ptr<DataType> value_type() const = 0;
 
-class HashKernelImpl : public HashKernel {
- public:
-  Status Append(KernelContext* ctx, const ArrayData& input) override {
+  Status Append(KernelContext* ctx, const ArrayData& input) {
     std::lock_guard<std::mutex> guard(lock_);
     return Append(input);
   }
 
+  // Prepare the Action for the given input (e.g. reserve appropriately sized
+  // data structures) and visit the given input with Action.
   virtual Status Append(const ArrayData& arr) = 0;
 
  protected:
@@ -242,9 +239,9 @@ class HashKernelImpl : public HashKernel {
 template <typename Type, typename Scalar, typename Action,
           bool with_error_status = Action::with_error_status,
           bool with_memo_visit_null = Action::with_memo_visit_null>
-class RegularHashKernelImpl : public HashKernelImpl {
+class RegularHashKernel : public HashKernel {
  public:
-  RegularHashKernelImpl(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+  RegularHashKernel(const std::shared_ptr<DataType>& type, MemoryPool* pool)
       : pool_(pool), type_(type), action_(type, pool) {}
 
   Status Reset() override {
@@ -266,6 +263,8 @@ class RegularHashKernelImpl : public HashKernelImpl {
                                                           0 /* start_offset */, out);
   }
 
+  std::shared_ptr<DataType> value_type() const override { return type_; }
+
   template <bool HasError = with_error_status>
   enable_if_t<!HasError, Status> DoAppend(const ArrayData& arr) {
     return VisitArrayDataInline<Type>(
@@ -347,9 +346,9 @@ class RegularHashKernelImpl : public HashKernelImpl {
 // Hash kernel implementation for nulls
 
 template <typename Action>
-class NullHashKernelImpl : public HashKernelImpl {
+class NullHashKernel : public HashKernel {
  public:
-  NullHashKernelImpl(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+  NullHashKernel(const std::shared_ptr<DataType>& type, MemoryPool* pool)
       : pool_(pool), type_(type), action_(type, pool) {}
 
   Status Reset() override { return action_.Reset(); }
@@ -376,6 +375,8 @@ class NullHashKernelImpl : public HashKernelImpl {
     return Status::OK();
   }
 
+  std::shared_ptr<DataType> value_type() const override { return type_; }
+
  protected:
   MemoryPool* pool_;
   std::shared_ptr<DataType> type_;
@@ -383,44 +384,83 @@ class NullHashKernelImpl : public HashKernelImpl {
 };
 
 // ----------------------------------------------------------------------
-// Kernel wrapper for generic hash table kernels
+// Hashing for dictionary type
+
+class DictionaryHashKernel : public HashKernel {
+ public:
+  explicit DictionaryHashKernel(std::unique_ptr<HashKernel> indices_kernel)
+      : indices_kernel_(std::move(indices_kernel)) {}
+
+  Status Reset() override { return indices_kernel_->Reset(); }
+
+  Status HandleDictionary(const std::shared_ptr<ArrayData>& dict) {
+    if (!dictionary_) {
+      dictionary_ = dict;
+    } else if (!MakeArray(dictionary_)->Equals(*MakeArray(dict))) {
+      return Status::Invalid(
+          "Only hashing for data with equal dictionaries "
+          "currently supported");
+    }
+    return Status::OK();
+  }
+
+  Status Append(const ArrayData& arr) override {
+    RETURN_NOT_OK(HandleDictionary(arr.dictionary));
+    return indices_kernel_->Append(arr);
+  }
+
+  Status Flush(Datum* out) override { return indices_kernel_->Flush(out); }
+
+  Status FlushFinal(Datum* out) override { return indices_kernel_->FlushFinal(out); }
+
+  Status GetDictionary(std::shared_ptr<ArrayData>* out) override {
+    return indices_kernel_->GetDictionary(out);
+  }
+
+  std::shared_ptr<DataType> value_type() const override {
+    return indices_kernel_->value_type();
+  }
+
+  std::shared_ptr<ArrayData> dictionary() const { return dictionary_; }
+
+ private:
+  std::unique_ptr<HashKernel> indices_kernel_;
+  std::shared_ptr<ArrayData> dictionary_;
+};
+
+// ----------------------------------------------------------------------
 
 template <typename Type, typename Action, typename Enable = void>
 struct HashKernelTraits {};
 
 template <typename Type, typename Action>
 struct HashKernelTraits<Type, Action, enable_if_null<Type>> {
-  using HashKernelImpl = NullHashKernelImpl<Action>;
+  using HashKernel = NullHashKernel<Action>;
 };
 
 template <typename Type, typename Action>
 struct HashKernelTraits<Type, Action, enable_if_has_c_type<Type>> {
-  using HashKernelImpl = RegularHashKernelImpl<Type, typename Type::c_type, Action>;
+  using HashKernel = RegularHashKernel<Type, typename Type::c_type, Action>;
 };
 
 template <typename Type, typename Action>
 struct HashKernelTraits<Type, Action, enable_if_has_string_view<Type>> {
-  using HashKernelImpl = RegularHashKernelImpl<Type, util::string_view, Action>;
+  using HashKernel = RegularHashKernel<Type, util::string_view, Action>;
 };
 
-template <typename T, typename R = void>
-using enable_if_can_hash =
-    enable_if_t<is_null_type<T>::value || has_c_type<T>::value ||
-                    is_base_binary_type<T>::value || is_fixed_size_binary_type<T>::value,
-                R>;
-
 template <typename Type, typename Action>
-struct HashInitFunctor {
-  using HashKernelType = typename HashKernelTraits<Type, Action>::HashKernelImpl;
+std::unique_ptr<HashKernel> HashInitImpl(KernelContext* ctx, const KernelInitArgs& args) {
+  using HashKernelType = typename HashKernelTraits<Type, Action>::HashKernel;
+  auto result = ::arrow::internal::make_unique<HashKernelType>(args.inputs[0].type,
+                                                               ctx->memory_pool());
+  ctx->SetStatus(result->Reset());
+  return std::move(result);
+}
 
-  static std::unique_ptr<KernelState> Init(KernelContext* ctx,
-                                           const KernelInitArgs& args) {
-    auto result = std::unique_ptr<HashKernel>(
-        new HashKernelType(args.inputs[0].type, ctx->memory_pool()));
-    ctx->SetStatus(result->Reset());
-    return std::move(result);
-  }
-};
+template <typename Type, typename Action>
+std::unique_ptr<KernelState> HashInit(KernelContext* ctx, const KernelInitArgs& args) {
+  return std::move(HashInitImpl<Type, Action>(ctx, args));
+}
 
 template <typename Action>
 KernelInit GetHashInit(Type::type type_id) {
@@ -428,21 +468,21 @@ KernelInit GetHashInit(Type::type type_id) {
   // representation
   switch (type_id) {
     case Type::NA:
-      return HashInitFunctor<NullType, Action>::Init;
+      return HashInit<NullType, Action>;
     case Type::BOOL:
-      return HashInitFunctor<BooleanType, Action>::Init;
+      return HashInit<BooleanType, Action>;
     case Type::INT8:
     case Type::UINT8:
-      return HashInitFunctor<UInt8Type, Action>::Init;
+      return HashInit<UInt8Type, Action>;
     case Type::INT16:
     case Type::UINT16:
-      return HashInitFunctor<UInt16Type, Action>::Init;
+      return HashInit<UInt16Type, Action>;
     case Type::INT32:
     case Type::UINT32:
     case Type::FLOAT:
     case Type::DATE32:
     case Type::TIME32:
-      return HashInitFunctor<UInt32Type, Action>::Init;
+      return HashInit<UInt32Type, Action>;
     case Type::INT64:
     case Type::UINT64:
     case Type::DOUBLE:
@@ -450,22 +490,47 @@ KernelInit GetHashInit(Type::type type_id) {
     case Type::TIME64:
     case Type::TIMESTAMP:
     case Type::DURATION:
-      return HashInitFunctor<UInt64Type, Action>::Init;
+      return HashInit<UInt64Type, Action>;
     case Type::BINARY:
     case Type::STRING:
-      return HashInitFunctor<BinaryType, Action>::Init;
+      return HashInit<BinaryType, Action>;
     case Type::LARGE_BINARY:
     case Type::LARGE_STRING:
-      return HashInitFunctor<LargeBinaryType, Action>::Init;
+      return HashInit<LargeBinaryType, Action>;
     case Type::FIXED_SIZE_BINARY:
     case Type::DECIMAL:
-      return HashInitFunctor<FixedSizeBinaryType, Action>::Init;
+      return HashInit<FixedSizeBinaryType, Action>;
     default:
       DCHECK(false);
       return nullptr;
   }
 }
 
+template <typename Action>
+std::unique_ptr<KernelState> DictionaryHashInit(KernelContext* ctx,
+                                                const KernelInitArgs& args) {
+  const auto& dict_type = checked_cast<const DictionaryType&>(*args.inputs[0].type);
+  std::unique_ptr<HashKernel> indices_hasher;
+  switch (dict_type.index_type()->id()) {
+    case Type::INT8:
+      indices_hasher = HashInitImpl<UInt8Type, Action>(ctx, args);
+      break;
+    case Type::INT16:
+      indices_hasher = HashInitImpl<UInt16Type, Action>(ctx, args);
+      break;
+    case Type::INT32:
+      indices_hasher = HashInitImpl<UInt32Type, Action>(ctx, args);
+      break;
+    case Type::INT64:
+      indices_hasher = HashInitImpl<UInt64Type, Action>(ctx, args);
+      break;
+    default:
+      DCHECK(false) << "Unsupported dictionary index type";
+      break;
+  }
+  return ::arrow::internal::make_unique<DictionaryHashKernel>(std::move(indices_hasher));
+}
+
 void HashExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
   auto hash_impl = checked_cast<HashKernel*>(ctx->state());
   KERNEL_RETURN_IF_ERROR(ctx, hash_impl->Append(ctx, *batch[0].array()));
@@ -491,18 +556,41 @@ void DictEncodeFinalize(KernelContext* ctx, std::vector<Datum>* out) {
   }
 }
 
+std::shared_ptr<ArrayData> BoxValueCounts(const std::shared_ptr<ArrayData>& uniques,
+                                          const std::shared_ptr<ArrayData>& counts) {
+  auto data_type =
+      struct_({field(kValuesFieldName, uniques->type), field(kCountsFieldName, int64())});
+  ArrayVector children = {MakeArray(uniques), MakeArray(counts)};
+  return std::make_shared<StructArray>(data_type, uniques->length, children)->data();
+}
+
 void ValueCountsFinalize(KernelContext* ctx, std::vector<Datum>* out) {
   auto hash_impl = checked_cast<HashKernel*>(ctx->state());
   std::shared_ptr<ArrayData> uniques;
+  Datum value_counts;
+
   KERNEL_RETURN_IF_ERROR(ctx, hash_impl->GetDictionary(&uniques));
+  KERNEL_RETURN_IF_ERROR(ctx, hash_impl->FlushFinal(&value_counts));
+  *out = {Datum(BoxValueCounts(uniques, value_counts.array()))};
+}
 
+void UniqueFinalizeDictionary(KernelContext* ctx, std::vector<Datum>* out) {
+  UniqueFinalize(ctx, out);
+  if (ctx->HasError()) {
+    return;
+  }
+  auto hash = checked_cast<DictionaryHashKernel*>(ctx->state());
+  (*out)[0].mutable_array()->dictionary = hash->dictionary();
+}
+
+void ValueCountsFinalizeDictionary(KernelContext* ctx, std::vector<Datum>* out) {
+  auto hash = checked_cast<DictionaryHashKernel*>(ctx->state());
+  std::shared_ptr<ArrayData> uniques;
   Datum value_counts;
-  KERNEL_RETURN_IF_ERROR(ctx, hash_impl->FlushFinal(&value_counts));
-  auto data_type =
-      struct_({field(kValuesFieldName, uniques->type), field(kCountsFieldName, int64())});
-  ArrayVector children = {MakeArray(uniques), value_counts.make_array()};
-  auto result = std::make_shared<StructArray>(data_type, uniques->length, children);
-  *out = {Datum(result)};
+  KERNEL_RETURN_IF_ERROR(ctx, hash->GetDictionary(&uniques));
+  KERNEL_RETURN_IF_ERROR(ctx, hash->FlushFinal(&value_counts));
+  uniques->dictionary = hash->dictionary();
+  *out = {Datum(BoxValueCounts(uniques, value_counts.array()))};
 }
 
 ValueDescr DictEncodeOutput(KernelContext*, const std::vector<ValueDescr>& descrs) {
@@ -515,9 +603,7 @@ ValueDescr ValueCountsOutput(KernelContext*, const std::vector<ValueDescr>& desc
 }
 
 template <typename Action>
-void AddHashKernels(VectorFunction* func, VectorKernel base,
-                    OutputType::Resolver out_resolver) {
-  OutputType out_ty(out_resolver);
+void AddHashKernels(VectorFunction* func, VectorKernel base, OutputType out_ty) {
   for (const auto& ty : PrimitiveTypes()) {
     base.init = GetHashInit<Action>(ty->id());
     base.signature = KernelSignature::Make({InputType::Array(ty)}, out_ty);
@@ -544,24 +630,54 @@ void RegisterVectorHash(FunctionRegistry* registry) {
   VectorKernel base;
   base.exec = HashExec;
 
-  // Unique and ValueCounts output unchunked arrays
+  // ----------------------------------------------------------------------
+  // unique
 
   base.finalize = UniqueFinalize;
   base.output_chunked = false;
   auto unique = std::make_shared<VectorFunction>("unique", Arity::Unary());
-  AddHashKernels<UniqueAction>(unique.get(), base, /*output_type=*/FirstType);
+  AddHashKernels<UniqueAction>(unique.get(), base, OutputType(FirstType));
+
+  // Dictionary unique
+  base.init = DictionaryHashInit<UniqueAction>;
+  base.finalize = UniqueFinalizeDictionary;
+  base.signature =
+      KernelSignature::Make({InputType::Array(Type::DICTIONARY)}, OutputType(FirstType));
+  DCHECK_OK(unique->AddKernel(base));
+
   DCHECK_OK(registry->AddFunction(std::move(unique)));
 
+  // ----------------------------------------------------------------------
+  // value_counts
+
   base.finalize = ValueCountsFinalize;
   auto value_counts = std::make_shared<VectorFunction>("value_counts", Arity::Unary());
-  AddHashKernels<ValueCountsAction>(value_counts.get(), base, ValueCountsOutput);
+  AddHashKernels<ValueCountsAction>(value_counts.get(), base,
+                                    OutputType(ValueCountsOutput));
+
+  // Dictionary value counts
+  base.init = DictionaryHashInit<ValueCountsAction>;
+  base.finalize = ValueCountsFinalizeDictionary;
+  base.signature = KernelSignature::Make({InputType::Array(Type::DICTIONARY)},
+                                         OutputType(ValueCountsOutput));
+  DCHECK_OK(value_counts->AddKernel(base));
+
   DCHECK_OK(registry->AddFunction(std::move(value_counts)));
 
+  // ----------------------------------------------------------------------
+  // dictionary_encode
+
   base.finalize = DictEncodeFinalize;
+  // Unique and ValueCounts output unchunked arrays
   base.output_chunked = true;
   auto dict_encode =
       std::make_shared<VectorFunction>("dictionary_encode", Arity::Unary());
-  AddHashKernels<DictEncodeAction>(dict_encode.get(), base, DictEncodeOutput);
+  AddHashKernels<DictEncodeAction>(dict_encode.get(), base, OutputType(DictEncodeOutput));
+
+  // Calling dictionary_encode on dictionary input not supported, but if it
+  // ends up being needed (or convenience), a kernel could be added to make it
+  // a no-op
+
   DCHECK_OK(registry->AddFunction(std::move(dict_encode)));
 }
 
diff --git a/cpp/src/arrow/compute/kernels/vector_hash_test.cc b/cpp/src/arrow/compute/kernels/vector_hash_test.cc
index 83e8240..7641861 100644
--- a/cpp/src/arrow/compute/kernels/vector_hash_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_hash_test.cc
@@ -54,7 +54,8 @@ namespace compute {
 // ----------------------------------------------------------------------
 // Dictionary tests
 
-void CheckUnique(const std::shared_ptr<Array>& input,
+template <typename T>
+void CheckUnique(const std::shared_ptr<T>& input,
                  const std::shared_ptr<Array>& expected) {
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> result, Unique(input));
   ASSERT_OK(result->ValidateFull());
@@ -68,7 +69,6 @@ void CheckUnique(const std::shared_ptr<DataType>& type, const std::vector<T>& in
                  const std::vector<bool>& out_is_valid) {
   std::shared_ptr<Array> input = _MakeArray<Type, T>(type, in_values, in_is_valid);
   std::shared_ptr<Array> expected = _MakeArray<Type, T>(type, out_values, out_is_valid);
-
   CheckUnique(input, expected);
 }
 
@@ -91,7 +91,8 @@ void CheckValueCountsNull(const std::shared_ptr<DataType>& type) {
   ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->GetFieldByName(kCountsFieldName));
 }
 
-void CheckValueCounts(const std::shared_ptr<Array>& input,
+template <typename T>
+void CheckValueCounts(const std::shared_ptr<T>& input,
                       const std::shared_ptr<Array>& expected_values,
                       const std::shared_ptr<Array>& expected_counts) {
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> result, ValueCounts(input));
@@ -558,6 +559,36 @@ TEST_F(TestHashKernel, DictEncodeDecimal) {
                                               {}, {0, 0, 1, 0, 2});
 }
 
+TEST_F(TestHashKernel, DictionaryUniqueAndValueCounts) {
+  for (auto index_ty : {int8(), int16(), int32(), int64()}) {
+    auto indices = ArrayFromJSON(index_ty, "[3, 0, 0, 0, 1, 1, 3, 0, 1, 3, 0, 1]");
+    auto dict = ArrayFromJSON(int64(), "[10, 20, 30, 40]");
+
+    auto dict_ty = dictionary(index_ty, int64());
+
+    auto ex_indices = ArrayFromJSON(index_ty, "[3, 0, 1]");
+
+    auto input = std::make_shared<DictionaryArray>(dict_ty, indices, dict);
+    auto ex_uniques = std::make_shared<DictionaryArray>(dict_ty, ex_indices, dict);
+    CheckUnique(input, ex_uniques);
+
+    auto ex_counts = ArrayFromJSON(int64(), "[3, 5, 4]");
+    CheckValueCounts(input, ex_uniques, ex_counts);
+
+    // Check chunked array
+    auto chunked = *ChunkedArray::Make({input->Slice(0, 2), input->Slice(2)});
+    CheckUnique(chunked, ex_uniques);
+    CheckValueCounts(chunked, ex_uniques, ex_counts);
+
+    // Different dictionaries not supported
+    auto dict2 = ArrayFromJSON(int64(), "[30, 40, 50, 60]");
+    auto input2 = std::make_shared<DictionaryArray>(dict_ty, indices, dict2);
+    auto different_dictionaries = *ChunkedArray::Make({input, input2});
+    ASSERT_RAISES(Invalid, Unique(different_dictionaries));
+    ASSERT_RAISES(Invalid, ValueCounts(different_dictionaries));
+  }
+}
+
 /* TODO(ARROW-4124): Determine if we want to do something that is reproducible with
  * floats.
 TEST_F(TestHashKernel, ValueCountsFloat) {
diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py
index 3cb3091..5d806c9 100644
--- a/python/pyarrow/tests/test_array.py
+++ b/python/pyarrow/tests/test_array.py
@@ -1316,6 +1316,21 @@ def test_value_counts_simple():
             assert result.field("counts").equals(expected_counts)
 
 
+def test_unique_value_counts_dictionary_type():
+    indices = pa.array([3, 0, 0, 0, 1, 1, 3, 0, 1, 3, 0, 1])
+    dictionary = pa.array(['foo', 'bar', 'baz', 'qux'])
+
+    arr = pa.DictionaryArray.from_arrays(indices, dictionary)
+
+    unique_result = arr.unique()
+    expected = pa.DictionaryArray.from_arrays(indices.unique(), dictionary)
+    assert unique_result.equals(expected)
+
+    result = arr.value_counts()
+    result.field('values').equals(unique_result)
+    result.field('counts').equals(pa.array([3, 5, 4], type='int64'))
+
+
 def test_dictionary_encode_simple():
     cases = [
         (pa.array([1, 2, 3, None, 1, 2, 3]),