You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/11/17 23:29:52 UTC

[arrow] branch master updated: ARROW-1559: [C++] Add Unique kernel and refactor DictionaryBuilder to be a stateful kernel

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f2806fa  ARROW-1559: [C++] Add Unique kernel and refactor DictionaryBuilder to be a stateful kernel
f2806fa is described below

commit f2806fa518583907a129b2ecb0b7ec8758b69e17
Author: Wes McKinney <we...@twosigma.com>
AuthorDate: Fri Nov 17 18:29:49 2017 -0500

    ARROW-1559: [C++] Add Unique kernel and refactor DictionaryBuilder to be a stateful kernel
    
    Only intended to implement selective categorical conversion in `to_pandas()` but it seems that there is a lot missing to do this in a clean fashion.
    
    Author: Wes McKinney <we...@twosigma.com>
    
    Closes #1266 from xhochy/ARROW-1559 and squashes the following commits:
    
    50249652 [Wes McKinney] Fix MSVC linker issue
    b6cb1ece [Wes McKinney] Export CastOptions
    4ea3ce61 [Wes McKinney] Return NONE Datum in else branch of functions
    4f969c6b [Wes McKinney] Move deprecation suppression after flag munging
    7f557cc0 [Wes McKinney] Code review comments, disable C4996 warning (equivalent to -Wno-deprecated) in MSVC builds
    84717461 [Wes McKinney] Do not compute hash table threshold on each iteration
    ae8f2339 [Wes McKinney] Fix double to int64_t conversion warning
    c1444a26 [Wes McKinney] Fix doxygen warnings
    2de85961 [Wes McKinney] Add test cases for unique, dictionary_encode
    383b46fd [Wes McKinney] Add Array methods for Unique, DictionaryEncode
    0962f06b [Wes McKinney] Add cast method for Column, chunked_array and column factory functions
    62c3cefd [Wes McKinney] Datum stubs
    27151c47 [Wes McKinney] Implement Cast for chunked arrays, fix kernel implementation. Change kernel API to write to a single Datum
    1bf2e2f4 [Wes McKinney] Fix bug with column using wrong type
    eaadc3e5 [Wes McKinney] Use macros to reduce code duplication in DoubleTableSize
    6b4f8f3c [Wes McKinney] Fix datetime64->date32 casting error raised by refactor
    2c77a19e [Wes McKinney] Some Decimal->Decimal128 renaming. Add DecimalType base class
    c07f91b3 [Wes McKinney] ARROW-1559: Add unique kernel
---
 LICENSE.txt                                        |   59 +
 cpp/CMakeLists.txt                                 |    1 +
 cpp/build-support/clang_format_exclusions.txt      |    2 +
 cpp/cmake_modules/SetupCxxFlags.cmake              |    9 +
 cpp/src/arrow/CMakeLists.txt                       |    4 +-
 cpp/src/arrow/array-test.cc                        |  356 +------
 cpp/src/arrow/array.cc                             |    2 +-
 cpp/src/arrow/buffer.h                             |    3 +-
 cpp/src/arrow/builder-benchmark.cc                 |   43 -
 cpp/src/arrow/builder.cc                           |  456 +-------
 cpp/src/arrow/builder.h                            |  203 +---
 cpp/src/arrow/compute/CMakeLists.txt               |    4 +-
 cpp/src/arrow/compute/api.h                        |    4 +-
 cpp/src/arrow/compute/compute-benchmark.cc         |   88 ++
 cpp/src/arrow/compute/compute-test.cc              |  369 ++++++-
 cpp/src/arrow/compute/context.cc                   |    7 +-
 cpp/src/arrow/compute/kernel.h                     |   97 +-
 cpp/src/arrow/compute/{ => kernels}/CMakeLists.txt |   21 +-
 cpp/src/arrow/compute/{ => kernels}/cast.cc        |  317 +++---
 cpp/src/arrow/compute/{ => kernels}/cast.h         |   33 +-
 cpp/src/arrow/compute/kernels/hash.cc              |  822 ++++++++++++++
 cpp/src/arrow/compute/kernels/hash.h               |  106 ++
 cpp/src/arrow/compute/kernels/util-internal.cc     |   85 ++
 cpp/src/arrow/compute/kernels/util-internal.h      |  105 ++
 cpp/src/arrow/pretty_print-test.cc                 |    5 +-
 cpp/src/arrow/python/arrow_to_pandas.cc            |   15 +-
 cpp/src/arrow/python/numpy_to_arrow.cc             |   59 +-
 cpp/src/arrow/table.h                              |    3 +
 cpp/src/arrow/test-util.h                          |    4 +
 cpp/src/arrow/type-test.cc                         |    6 +-
 cpp/src/arrow/type.h                               |   27 +-
 cpp/src/arrow/type_fwd.h                           |    6 +
 cpp/src/arrow/type_traits.h                        |    4 +
 cpp/src/arrow/util/CMakeLists.txt                  |    3 +
 cpp/src/arrow/util/variant.h                       | 1127 ++++++++++++++++++++
 .../arrow/{compute => util/variant}/CMakeLists.txt |   26 +-
 cpp/src/arrow/util/variant/optional.h              |  100 ++
 cpp/src/arrow/util/variant/recursive_wrapper.h     |  122 +++
 cpp/src/arrow/util/variant/variant_cast.h          |  112 ++
 cpp/src/arrow/util/variant/variant_io.h            |   72 ++
 cpp/src/arrow/util/variant/variant_visitor.h       |   69 ++
 cpp/src/arrow/visitor.cc                           |    2 +-
 cpp/src/arrow/visitor_inline.h                     |    2 +-
 dev/release/rat_exclude_files.txt                  |    6 +
 python/doc/source/api.rst                          |    3 +
 python/doc/source/development.rst                  |   20 +-
 python/pyarrow/__init__.py                         |    2 +-
 python/pyarrow/array.pxi                           |   37 +-
 python/pyarrow/includes/libarrow.pxd               |   49 +
 python/pyarrow/pandas_compat.py                    |    3 +-
 python/pyarrow/table.pxi                           |  129 ++-
 python/pyarrow/tests/test_array.py                 |   39 +
 python/pyarrow/tests/test_parquet.py               |    4 +
 python/pyarrow/types.pxi                           |    1 +
 54 files changed, 3908 insertions(+), 1345 deletions(-)

diff --git a/LICENSE.txt b/LICENSE.txt
index 00cb9ec..038518a 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -398,3 +398,62 @@ DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+--------------------------------------------------------------------------------
+
+This project includes code from the Boost project
+
+Boost Software License - Version 1.0 - August 17th, 2003
+
+Permission is hereby granted, free of charge, to any person or organization
+obtaining a copy of the software and accompanying documentation covered by
+this license (the "Software") to use, reproduce, display, distribute,
+execute, and transmit the Software, and to prepare derivative works of the
+Software, and to permit third-parties to whom the Software is furnished to
+do so, all subject to the following:
+
+The copyright notices in the Software and this entire statement, including
+the above license grant, this restriction and the following disclaimer,
+must be included in all copies of the Software, in whole or in part, and
+all derivative works of the Software, unless such copies or derivative
+works are solely in the form of machine-executable object code generated by
+a source language processor.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
+
+--------------------------------------------------------------------------------
+
+This project includes code from the mapbox/variant project, BSD 3-clause
+license
+
+Copyright (c) MapBox
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+- Redistributions of source code must retain the above copyright notice, this
+  list of conditions and the following disclaimer.
+- Redistributions in binary form must reproduce the above copyright notice, this
+  list of conditions and the following disclaimer in the documentation and/or
+  other materials provided with the distribution.
+- Neither the name "MapBox" nor the names of its contributors may be
+  used to endorse or promote products derived from this software without
+  specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 5f0c431..f4b7b29 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -414,6 +414,7 @@ if (UNIX)
             (item MATCHES "xxhash.h") OR
             (item MATCHES "xxhash.cc") OR
             (item MATCHES "config.h") OR
+            (item MATCHES "util/variant") OR
             (item MATCHES "zmalloc.h") OR
             (item MATCHES "ae.h")))
       LIST(APPEND FILTERED_LINT_FILES ${item})
diff --git a/cpp/build-support/clang_format_exclusions.txt b/cpp/build-support/clang_format_exclusions.txt
index 2d5d86d..d31d8a0 100644
--- a/cpp/build-support/clang_format_exclusions.txt
+++ b/cpp/build-support/clang_format_exclusions.txt
@@ -3,6 +3,8 @@
 *pyarrow_api.h
 *python/config.h
 *python/platform.h
+*util/variant.h
+*util/variant/*
 *thirdparty/ae/*
 *xxhash.cc
 *xxhash.h
diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index 4b1950f..6b0974b 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -68,6 +68,9 @@ if ("${UPPERCASE_BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /W3")
     # Treat all compiler warnings as errors
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /WX")
+
+    # MSVC version of -Wno-deprecated
+    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /wd4996")
   elseif ("${COMPILER_FAMILY}" STREQUAL "clang")
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Weverything -Wno-c++98-compat \
 -Wno-c++98-compat-pedantic -Wno-deprecated -Wno-weak-vtables -Wno-padded \
@@ -115,6 +118,9 @@ elseif ("${UPPERCASE_BUILD_WARNING_LEVEL}" STREQUAL "EVERYTHING")
     # /wdnnnn disables a warning where "nnnn" is a warning number
     # Treat all compiler warnings as errors
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS}  /WX")
+
+    # MSVC version of -Wno-deprecated
+    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /wd4996")
   elseif ("${COMPILER_FAMILY}" STREQUAL "clang")
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Weverything -Wno-c++98-compat -Wno-c++98-compat-pedantic")
     # Treat all compiler warnings as errors
@@ -134,6 +140,9 @@ else()
     # /wdnnnn disables a warning where "nnnn" is a warning number
     string(REPLACE "/W3" "" CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS}")
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /W3")
+
+    # MSVC version of -Wno-deprecated
+    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /wd4996")
   elseif ("${COMPILER_FAMILY}" STREQUAL "clang")
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall")
   elseif ("${COMPILER_FAMILY}" STREQUAL "gcc")
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 69d5052..496e0da 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -50,8 +50,10 @@ endif()
 if (ARROW_COMPUTE)
   add_subdirectory(compute)
   set(ARROW_SRCS ${ARROW_SRCS}
-    compute/cast.cc
     compute/context.cc
+    compute/kernels/cast.cc
+    compute/kernels/hash.cc
+    compute/kernels/util-internal.cc
   )
 endif()
 
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index 15c7553..d894df1 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -402,10 +402,6 @@ typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PIn
 
 TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives);
 
-#define DECL_T() typedef typename TestFixture::T T;
-
-#define DECL_TYPE() typedef typename TestFixture::Type Type;
-
 TYPED_TEST(TestPrimitiveBuilder, TestInit) {
   DECL_TYPE();
 
@@ -1624,353 +1620,6 @@ TEST_F(TestAdaptiveUIntBuilder, TestAppendVector) {
 }
 
 // ----------------------------------------------------------------------
-// Dictionary tests
-
-template <typename Type>
-class TestDictionaryBuilder : public TestBuilder {};
-
-typedef ::testing::Types<Int8Type, UInt8Type, Int16Type, UInt16Type, Int32Type,
-                         UInt32Type, Int64Type, UInt64Type, FloatType, DoubleType>
-    PrimitiveDictionaries;
-
-TYPED_TEST_CASE(TestDictionaryBuilder, PrimitiveDictionaries);
-
-TYPED_TEST(TestDictionaryBuilder, Basic) {
-  DictionaryBuilder<TypeParam> builder(default_memory_pool());
-  ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1)));
-  ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2)));
-  ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1)));
-
-  std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
-
-  // Build expected data
-  NumericBuilder<TypeParam> dict_builder;
-  ASSERT_OK(dict_builder.Append(static_cast<typename TypeParam::c_type>(1)));
-  ASSERT_OK(dict_builder.Append(static_cast<typename TypeParam::c_type>(2)));
-  std::shared_ptr<Array> dict_array;
-  ASSERT_OK(dict_builder.Finish(&dict_array));
-  auto dtype = std::make_shared<DictionaryType>(int8(), dict_array);
-
-  Int8Builder int_builder;
-  ASSERT_OK(int_builder.Append(0));
-  ASSERT_OK(int_builder.Append(1));
-  ASSERT_OK(int_builder.Append(0));
-  std::shared_ptr<Array> int_array;
-  ASSERT_OK(int_builder.Finish(&int_array));
-
-  DictionaryArray expected(dtype, int_array);
-  ASSERT_TRUE(expected.Equals(result));
-}
-
-TYPED_TEST(TestDictionaryBuilder, ArrayConversion) {
-  NumericBuilder<TypeParam> builder;
-  // DictionaryBuilder<TypeParam> builder;
-  ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1)));
-  ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2)));
-  ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1)));
-
-  std::shared_ptr<Array> intermediate_result;
-  ASSERT_OK(builder.Finish(&intermediate_result));
-  DictionaryBuilder<TypeParam> dictionary_builder(default_memory_pool());
-  ASSERT_OK(dictionary_builder.AppendArray(*intermediate_result));
-  std::shared_ptr<Array> result;
-  ASSERT_OK(dictionary_builder.Finish(&result));
-
-  // Build expected data
-  NumericBuilder<TypeParam> dict_builder;
-  ASSERT_OK(dict_builder.Append(static_cast<typename TypeParam::c_type>(1)));
-  ASSERT_OK(dict_builder.Append(static_cast<typename TypeParam::c_type>(2)));
-  std::shared_ptr<Array> dict_array;
-  ASSERT_OK(dict_builder.Finish(&dict_array));
-  auto dtype = std::make_shared<DictionaryType>(int8(), dict_array);
-
-  Int8Builder int_builder;
-  ASSERT_OK(int_builder.Append(0));
-  ASSERT_OK(int_builder.Append(1));
-  ASSERT_OK(int_builder.Append(0));
-  std::shared_ptr<Array> int_array;
-  ASSERT_OK(int_builder.Finish(&int_array));
-
-  DictionaryArray expected(dtype, int_array);
-  ASSERT_TRUE(expected.Equals(result));
-}
-
-TYPED_TEST(TestDictionaryBuilder, DoubleTableSize) {
-  using Scalar = typename TypeParam::c_type;
-  // Skip this test for (u)int8
-  if (sizeof(Scalar) > 1) {
-    // Build the dictionary Array
-    DictionaryBuilder<TypeParam> builder(default_memory_pool());
-    // Build expected data
-    NumericBuilder<TypeParam> dict_builder;
-    Int16Builder int_builder;
-
-    // Fill with 1024 different values
-    for (int64_t i = 0; i < 1024; i++) {
-      ASSERT_OK(builder.Append(static_cast<Scalar>(i)));
-      ASSERT_OK(dict_builder.Append(static_cast<Scalar>(i)));
-      ASSERT_OK(int_builder.Append(static_cast<uint16_t>(i)));
-    }
-    // Fill with an already existing value
-    for (int64_t i = 0; i < 1024; i++) {
-      ASSERT_OK(builder.Append(static_cast<Scalar>(1)));
-      ASSERT_OK(int_builder.Append(1));
-    }
-
-    // Finalize result
-    std::shared_ptr<Array> result;
-    ASSERT_OK(builder.Finish(&result));
-
-    // Finalize expected data
-    std::shared_ptr<Array> dict_array;
-    ASSERT_OK(dict_builder.Finish(&dict_array));
-    auto dtype = std::make_shared<DictionaryType>(int16(), dict_array);
-    std::shared_ptr<Array> int_array;
-    ASSERT_OK(int_builder.Finish(&int_array));
-
-    DictionaryArray expected(dtype, int_array);
-    ASSERT_TRUE(expected.Equals(result));
-  }
-}
-
-TEST(TestStringDictionaryBuilder, Basic) {
-  // Build the dictionary Array
-  StringDictionaryBuilder builder(default_memory_pool());
-  ASSERT_OK(builder.Append("test"));
-  ASSERT_OK(builder.Append("test2"));
-  ASSERT_OK(builder.Append("test"));
-
-  std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
-
-  // Build expected data
-  StringBuilder str_builder;
-  ASSERT_OK(str_builder.Append("test"));
-  ASSERT_OK(str_builder.Append("test2"));
-  std::shared_ptr<Array> str_array;
-  ASSERT_OK(str_builder.Finish(&str_array));
-  auto dtype = std::make_shared<DictionaryType>(int8(), str_array);
-
-  Int8Builder int_builder;
-  ASSERT_OK(int_builder.Append(0));
-  ASSERT_OK(int_builder.Append(1));
-  ASSERT_OK(int_builder.Append(0));
-  std::shared_ptr<Array> int_array;
-  ASSERT_OK(int_builder.Finish(&int_array));
-
-  DictionaryArray expected(dtype, int_array);
-  ASSERT_TRUE(expected.Equals(result));
-}
-
-TEST(TestStringDictionaryBuilder, DoubleTableSize) {
-  // Build the dictionary Array
-  StringDictionaryBuilder builder(default_memory_pool());
-  // Build expected data
-  StringBuilder str_builder;
-  Int16Builder int_builder;
-
-  // Fill with 1024 different values
-  for (int64_t i = 0; i < 1024; i++) {
-    std::stringstream ss;
-    ss << "test" << i;
-    ASSERT_OK(builder.Append(ss.str()));
-    ASSERT_OK(str_builder.Append(ss.str()));
-    ASSERT_OK(int_builder.Append(static_cast<uint16_t>(i)));
-  }
-  // Fill with an already existing value
-  for (int64_t i = 0; i < 1024; i++) {
-    ASSERT_OK(builder.Append("test1"));
-    ASSERT_OK(int_builder.Append(1));
-  }
-
-  // Finalize result
-  std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
-
-  // Finalize expected data
-  std::shared_ptr<Array> str_array;
-  ASSERT_OK(str_builder.Finish(&str_array));
-  auto dtype = std::make_shared<DictionaryType>(int16(), str_array);
-  std::shared_ptr<Array> int_array;
-  ASSERT_OK(int_builder.Finish(&int_array));
-
-  DictionaryArray expected(dtype, int_array);
-  ASSERT_TRUE(expected.Equals(result));
-}
-
-TEST(TestFixedSizeBinaryDictionaryBuilder, Basic) {
-  // Build the dictionary Array
-  DictionaryBuilder<FixedSizeBinaryType> builder(arrow::fixed_size_binary(4),
-                                                 default_memory_pool());
-  std::vector<uint8_t> test{12, 12, 11, 12};
-  std::vector<uint8_t> test2{12, 12, 11, 11};
-  ASSERT_OK(builder.Append(test.data()));
-  ASSERT_OK(builder.Append(test2.data()));
-  ASSERT_OK(builder.Append(test.data()));
-
-  std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
-
-  // Build expected data
-  FixedSizeBinaryBuilder fsb_builder(arrow::fixed_size_binary(4));
-  ASSERT_OK(fsb_builder.Append(test.data()));
-  ASSERT_OK(fsb_builder.Append(test2.data()));
-  std::shared_ptr<Array> fsb_array;
-  ASSERT_OK(fsb_builder.Finish(&fsb_array));
-  auto dtype = std::make_shared<DictionaryType>(int8(), fsb_array);
-
-  Int8Builder int_builder;
-  ASSERT_OK(int_builder.Append(0));
-  ASSERT_OK(int_builder.Append(1));
-  ASSERT_OK(int_builder.Append(0));
-  std::shared_ptr<Array> int_array;
-  ASSERT_OK(int_builder.Finish(&int_array));
-
-  DictionaryArray expected(dtype, int_array);
-  ASSERT_TRUE(expected.Equals(result));
-}
-
-TEST(TestFixedSizeBinaryDictionaryBuilder, DoubleTableSize) {
-  // Build the dictionary Array
-  DictionaryBuilder<FixedSizeBinaryType> builder(arrow::fixed_size_binary(4),
-                                                 default_memory_pool());
-  // Build expected data
-  FixedSizeBinaryBuilder fsb_builder(arrow::fixed_size_binary(4));
-  Int16Builder int_builder;
-
-  // Fill with 1024 different values
-  for (int64_t i = 0; i < 1024; i++) {
-    std::vector<uint8_t> value{12, 12, static_cast<uint8_t>(i / 128),
-                               static_cast<uint8_t>(i % 128)};
-    ASSERT_OK(builder.Append(value.data()));
-    ASSERT_OK(fsb_builder.Append(value.data()));
-    ASSERT_OK(int_builder.Append(static_cast<uint16_t>(i)));
-  }
-  // Fill with an already existing value
-  std::vector<uint8_t> known_value{12, 12, 0, 1};
-  for (int64_t i = 0; i < 1024; i++) {
-    ASSERT_OK(builder.Append(known_value.data()));
-    ASSERT_OK(int_builder.Append(1));
-  }
-
-  // Finalize result
-  std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
-
-  // Finalize expected data
-  std::shared_ptr<Array> fsb_array;
-  ASSERT_OK(fsb_builder.Finish(&fsb_array));
-  auto dtype = std::make_shared<DictionaryType>(int16(), fsb_array);
-  std::shared_ptr<Array> int_array;
-  ASSERT_OK(int_builder.Finish(&int_array));
-
-  DictionaryArray expected(dtype, int_array);
-  ASSERT_TRUE(expected.Equals(result));
-}
-
-TEST(TestFixedSizeBinaryDictionaryBuilder, InvalidTypeAppend) {
-  // Build the dictionary Array
-  DictionaryBuilder<FixedSizeBinaryType> builder(arrow::fixed_size_binary(4),
-                                                 default_memory_pool());
-  // Build an array with different byte width
-  FixedSizeBinaryBuilder fsb_builder(arrow::fixed_size_binary(5));
-  std::vector<uint8_t> value{100, 1, 1, 1, 1};
-  ASSERT_OK(fsb_builder.Append(value.data()));
-  std::shared_ptr<Array> fsb_array;
-  ASSERT_OK(fsb_builder.Finish(&fsb_array));
-
-  ASSERT_RAISES(Invalid, builder.AppendArray(*fsb_array));
-}
-
-TEST(TestDecimalDictionaryBuilder, Basic) {
-  // Build the dictionary Array
-  const auto& decimal_type = arrow::decimal(2, 0);
-  DictionaryBuilder<FixedSizeBinaryType> builder(decimal_type, default_memory_pool());
-
-  // Test data
-  std::vector<Decimal128> test{12, 12, 11, 12};
-  for (const auto& value : test) {
-    ASSERT_OK(builder.Append(value.ToBytes().data()));
-  }
-
-  std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
-
-  // Build expected data
-  FixedSizeBinaryBuilder decimal_builder(decimal_type);
-  ASSERT_OK(decimal_builder.Append(Decimal128(12).ToBytes()));
-  ASSERT_OK(decimal_builder.Append(Decimal128(11).ToBytes()));
-
-  std::shared_ptr<Array> decimal_array;
-  ASSERT_OK(decimal_builder.Finish(&decimal_array));
-  auto dtype = arrow::dictionary(int8(), decimal_array);
-
-  Int8Builder int_builder;
-  ASSERT_OK(int_builder.Append({0, 0, 1, 0}));
-  std::shared_ptr<Array> int_array;
-  ASSERT_OK(int_builder.Finish(&int_array));
-
-  DictionaryArray expected(dtype, int_array);
-  ASSERT_TRUE(expected.Equals(result));
-}
-
-TEST(TestDecimalDictionaryBuilder, DoubleTableSize) {
-  const auto& decimal_type = arrow::decimal(21, 0);
-
-  // Build the dictionary Array
-  DictionaryBuilder<FixedSizeBinaryType> builder(decimal_type, default_memory_pool());
-
-  // Build expected data
-  FixedSizeBinaryBuilder fsb_builder(decimal_type);
-  Int16Builder int_builder;
-
-  // Fill with 1024 different values
-  for (int64_t i = 0; i < 1024; i++) {
-    const uint8_t bytes[] = {0,
-                             0,
-                             0,
-                             0,
-                             0,
-                             0,
-                             0,
-                             0,
-                             0,
-                             0,
-                             0,
-                             0,
-                             12,
-                             12,
-                             static_cast<uint8_t>(i / 128),
-                             static_cast<uint8_t>(i % 128)};
-    ASSERT_OK(builder.Append(bytes));
-    ASSERT_OK(fsb_builder.Append(bytes));
-    ASSERT_OK(int_builder.Append(static_cast<uint16_t>(i)));
-  }
-  // Fill with an already existing value
-  const uint8_t known_value[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 12, 0, 1};
-  for (int64_t i = 0; i < 1024; i++) {
-    ASSERT_OK(builder.Append(known_value));
-    ASSERT_OK(int_builder.Append(1));
-  }
-
-  // Finalize result
-  std::shared_ptr<Array> result;
-  ASSERT_OK(builder.Finish(&result));
-
-  // Finalize expected data
-  std::shared_ptr<Array> fsb_array;
-  ASSERT_OK(fsb_builder.Finish(&fsb_array));
-
-  auto dtype = std::make_shared<DictionaryType>(int16(), fsb_array);
-  std::shared_ptr<Array> int_array;
-  ASSERT_OK(int_builder.Finish(&int_array));
-
-  DictionaryArray expected(dtype, int_array);
-  ASSERT_TRUE(expected.Equals(result));
-}
-
-// ----------------------------------------------------------------------
 // List tests
 
 class TestListArray : public TestBuilder {
@@ -2766,9 +2415,8 @@ class DecimalTest : public ::testing::TestWithParam<int> {
   template <size_t BYTE_WIDTH = 16>
   void TestCreate(int32_t precision, const DecimalVector& draw,
                   const std::vector<uint8_t>& valid_bytes, int64_t offset) const {
-    auto type = std::make_shared<DecimalType>(precision, 4);
-
-    auto builder = std::make_shared<DecimalBuilder>(type);
+    auto type = std::make_shared<Decimal128Type>(precision, 4);
+    auto builder = std::make_shared<Decimal128Builder>(type);
 
     size_t null_count = 0;
 
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index f2dd753..4ceb071 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -92,7 +92,7 @@ static inline std::shared_ptr<ArrayData> SliceData(const ArrayData& data, int64_
   auto new_data = data.ShallowCopy();
   new_data->length = length;
   new_data->offset = offset;
-  new_data->null_count = kUnknownNullCount;
+  new_data->null_count = data.null_count != 0 ? kUnknownNullCount : 0;
   return new_data;
 }
 
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index 7c5f617..450a4c7 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -340,12 +340,13 @@ Status AllocateResizableBuffer(MemoryPool* pool, const int64_t size,
 #ifndef ARROW_NO_DEPRECATED_API
 
 /// \brief Create Buffer referencing std::string memory
-/// \note Deprecated since 0.8.0
 ///
 /// Warning: string instance must stay alive
 ///
 /// \param str std::string instance
 /// \return std::shared_ptr<Buffer>
+///
+/// \note Deprecated Since 0.8.0
 static inline std::shared_ptr<Buffer> GetBufferFromString(const std::string& str) {
   return std::make_shared<Buffer>(str);
 }
diff --git a/cpp/src/arrow/builder-benchmark.cc b/cpp/src/arrow/builder-benchmark.cc
index 7ac7fe3..12dfbe8 100644
--- a/cpp/src/arrow/builder-benchmark.cc
+++ b/cpp/src/arrow/builder-benchmark.cc
@@ -115,47 +115,6 @@ static void BM_BuildAdaptiveUIntNoNulls(
   state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t));
 }
 
-static void BM_BuildDictionary(benchmark::State& state) {  // NOLINT non-const reference
-  const int64_t iterations = 1024;
-  while (state.KeepRunning()) {
-    DictionaryBuilder<Int64Type> builder(default_memory_pool());
-    for (int64_t i = 0; i < iterations; i++) {
-      for (int64_t j = 0; j < i; j++) {
-        ABORT_NOT_OK(builder.Append(j));
-      }
-    }
-    std::shared_ptr<Array> out;
-    ABORT_NOT_OK(builder.Finish(&out));
-  }
-  state.SetBytesProcessed(state.iterations() * iterations * (iterations + 1) / 2 *
-                          sizeof(int64_t));
-}
-
-static void BM_BuildStringDictionary(
-    benchmark::State& state) {  // NOLINT non-const reference
-  const int64_t iterations = 1024;
-  // Pre-render strings
-  std::vector<std::string> data;
-  for (int64_t i = 0; i < iterations; i++) {
-    std::stringstream ss;
-    ss << i;
-    data.push_back(ss.str());
-  }
-  while (state.KeepRunning()) {
-    StringDictionaryBuilder builder(default_memory_pool());
-    for (int64_t i = 0; i < iterations; i++) {
-      for (int64_t j = 0; j < i; j++) {
-        ABORT_NOT_OK(builder.Append(data[j]));
-      }
-    }
-    std::shared_ptr<Array> out;
-    ABORT_NOT_OK(builder.Finish(&out));
-  }
-  // Assuming a string here needs on average 2 bytes
-  state.SetBytesProcessed(state.iterations() * iterations * (iterations + 1) / 2 *
-                          sizeof(int32_t));
-}
-
 static void BM_BuildBinaryArray(benchmark::State& state) {  // NOLINT non-const reference
   const int64_t iterations = 1 << 20;
 
@@ -179,8 +138,6 @@ BENCHMARK(BM_BuildAdaptiveIntNoNullsScalarAppend)
     ->Repetitions(3)
     ->Unit(benchmark::kMicrosecond);
 BENCHMARK(BM_BuildAdaptiveUIntNoNulls)->Repetitions(3)->Unit(benchmark::kMicrosecond);
-BENCHMARK(BM_BuildDictionary)->Repetitions(3)->Unit(benchmark::kMicrosecond);
-BENCHMARK(BM_BuildStringDictionary)->Repetitions(3)->Unit(benchmark::kMicrosecond);
 
 BENCHMARK(BM_BuildBinaryArray)->Repetitions(3)->Unit(benchmark::kMicrosecond);
 
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index d2d3dbd..3e213fc 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -40,7 +40,6 @@
 namespace arrow {
 
 using internal::AdaptiveIntBuilderBase;
-using internal::WrappedBinary;
 
 Status ArrayBuilder::AppendToBitmap(bool is_valid) {
   if (length_ == capacity_) {
@@ -815,338 +814,6 @@ Status BooleanBuilder::Append(const std::vector<bool>& values) {
 }
 
 // ----------------------------------------------------------------------
-// DictionaryBuilder
-
-template <typename T>
-DictionaryBuilder<T>::DictionaryBuilder(const std::shared_ptr<DataType>& type,
-                                        MemoryPool* pool)
-    : ArrayBuilder(type, pool),
-      hash_table_(new PoolBuffer(pool)),
-      hash_slots_(nullptr),
-      dict_builder_(type, pool),
-      values_builder_(pool),
-      byte_width_(-1) {
-  if (!::arrow::CpuInfo::initialized()) {
-    ::arrow::CpuInfo::Init();
-  }
-}
-
-DictionaryBuilder<NullType>::DictionaryBuilder(const std::shared_ptr<DataType>& type,
-                                               MemoryPool* pool)
-    : ArrayBuilder(type, pool), values_builder_(pool) {
-  if (!::arrow::CpuInfo::initialized()) {
-    ::arrow::CpuInfo::Init();
-  }
-}
-
-DictionaryBuilder<NullType>::~DictionaryBuilder() {}
-
-template <>
-DictionaryBuilder<FixedSizeBinaryType>::DictionaryBuilder(
-    const std::shared_ptr<DataType>& type, MemoryPool* pool)
-    : ArrayBuilder(type, pool),
-      hash_table_(new PoolBuffer(pool)),
-      hash_slots_(nullptr),
-      dict_builder_(type, pool),
-      values_builder_(pool),
-      byte_width_(static_cast<const FixedSizeBinaryType&>(*type).byte_width()) {
-  if (!::arrow::CpuInfo::initialized()) {
-    ::arrow::CpuInfo::Init();
-  }
-}
-
-template <typename T>
-Status DictionaryBuilder<T>::Init(int64_t elements) {
-  RETURN_NOT_OK(ArrayBuilder::Init(elements));
-
-  // Fill the initial hash table
-  RETURN_NOT_OK(hash_table_->Resize(sizeof(hash_slot_t) * kInitialHashTableSize));
-  hash_slots_ = reinterpret_cast<int32_t*>(hash_table_->mutable_data());
-  std::fill(hash_slots_, hash_slots_ + kInitialHashTableSize, kHashSlotEmpty);
-  hash_table_size_ = kInitialHashTableSize;
-  mod_bitmask_ = kInitialHashTableSize - 1;
-
-  return values_builder_.Init(elements);
-}
-
-Status DictionaryBuilder<NullType>::Init(int64_t elements) {
-  RETURN_NOT_OK(ArrayBuilder::Init(elements));
-  return values_builder_.Init(elements);
-}
-
-template <typename T>
-Status DictionaryBuilder<T>::Resize(int64_t capacity) {
-  if (capacity < kMinBuilderCapacity) {
-    capacity = kMinBuilderCapacity;
-  }
-
-  if (capacity_ == 0) {
-    return Init(capacity);
-  } else {
-    return ArrayBuilder::Resize(capacity);
-  }
-}
-
-Status DictionaryBuilder<NullType>::Resize(int64_t capacity) {
-  if (capacity < kMinBuilderCapacity) {
-    capacity = kMinBuilderCapacity;
-  }
-
-  if (capacity_ == 0) {
-    return Init(capacity);
-  } else {
-    return ArrayBuilder::Resize(capacity);
-  }
-}
-
-template <typename T>
-Status DictionaryBuilder<T>::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  std::shared_ptr<Array> dictionary;
-  RETURN_NOT_OK(dict_builder_.Finish(&dictionary));
-
-  RETURN_NOT_OK(values_builder_.FinishInternal(out));
-  (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary);
-  return Status::OK();
-}
-
-Status DictionaryBuilder<NullType>::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  std::shared_ptr<Array> dictionary = std::make_shared<NullArray>(0);
-
-  RETURN_NOT_OK(values_builder_.FinishInternal(out));
-  (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary);
-  return Status::OK();
-}
-
-template <typename T>
-Status DictionaryBuilder<T>::Append(const Scalar& value) {
-  RETURN_NOT_OK(Reserve(1));
-  // Based on DictEncoder<DType>::Put
-  int j = HashValue(value) & mod_bitmask_;
-  hash_slot_t index = hash_slots_[j];
-
-  // Find an empty slot
-  while (kHashSlotEmpty != index && SlotDifferent(index, value)) {
-    // Linear probing
-    ++j;
-    if (j == hash_table_size_) {
-      j = 0;
-    }
-    index = hash_slots_[j];
-  }
-
-  if (index == kHashSlotEmpty) {
-    // Not in the hash table, so we insert it now
-    index = static_cast<hash_slot_t>(dict_builder_.length());
-    hash_slots_[j] = index;
-    RETURN_NOT_OK(AppendDictionary(value));
-
-    if (ARROW_PREDICT_FALSE(static_cast<int32_t>(dict_builder_.length()) >
-                            hash_table_size_ * kMaxHashTableLoad)) {
-      RETURN_NOT_OK(DoubleTableSize());
-    }
-  }
-
-  RETURN_NOT_OK(values_builder_.Append(index));
-
-  return Status::OK();
-}
-
-template <typename T>
-Status DictionaryBuilder<T>::AppendArray(const Array& array) {
-  const auto& numeric_array = static_cast<const NumericArray<T>&>(array);
-  for (int64_t i = 0; i < array.length(); i++) {
-    if (array.IsNull(i)) {
-      RETURN_NOT_OK(AppendNull());
-    } else {
-      RETURN_NOT_OK(Append(numeric_array.Value(i)));
-    }
-  }
-  return Status::OK();
-}
-
-Status DictionaryBuilder<NullType>::AppendArray(const Array& array) {
-  for (int64_t i = 0; i < array.length(); i++) {
-    RETURN_NOT_OK(AppendNull());
-  }
-  return Status::OK();
-}
-
-template <>
-Status DictionaryBuilder<FixedSizeBinaryType>::AppendArray(const Array& array) {
-  if (!type_->Equals(*array.type())) {
-    return Status::Invalid("Cannot append FixedSizeBinary array with non-matching type");
-  }
-
-  const auto& numeric_array = static_cast<const FixedSizeBinaryArray&>(array);
-  for (int64_t i = 0; i < array.length(); i++) {
-    if (array.IsNull(i)) {
-      RETURN_NOT_OK(AppendNull());
-    } else {
-      RETURN_NOT_OK(Append(numeric_array.Value(i)));
-    }
-  }
-  return Status::OK();
-}
-
-template <typename T>
-Status DictionaryBuilder<T>::AppendNull() {
-  return values_builder_.AppendNull();
-}
-
-Status DictionaryBuilder<NullType>::AppendNull() { return values_builder_.AppendNull(); }
-
-template <typename T>
-Status DictionaryBuilder<T>::DoubleTableSize() {
-  int new_size = hash_table_size_ * 2;
-  auto new_hash_table = std::make_shared<PoolBuffer>(pool_);
-
-  RETURN_NOT_OK(new_hash_table->Resize(sizeof(hash_slot_t) * new_size));
-  int32_t* new_hash_slots = reinterpret_cast<int32_t*>(new_hash_table->mutable_data());
-  std::fill(new_hash_slots, new_hash_slots + new_size, kHashSlotEmpty);
-  int new_mod_bitmask = new_size - 1;
-
-  for (int i = 0; i < hash_table_size_; ++i) {
-    hash_slot_t index = hash_slots_[i];
-
-    if (index == kHashSlotEmpty) {
-      continue;
-    }
-
-    // Compute the hash value mod the new table size to start looking for an
-    // empty slot
-    Scalar value = GetDictionaryValue(static_cast<int64_t>(index));
-
-    // Find an empty slot in the new hash table
-    int j = HashValue(value) & new_mod_bitmask;
-    hash_slot_t slot = new_hash_slots[j];
-
-    while (kHashSlotEmpty != slot && SlotDifferent(slot, value)) {
-      ++j;
-      if (j == new_size) {
-        j = 0;
-      }
-      slot = new_hash_slots[j];
-    }
-
-    // Copy the old slot index to the new hash table
-    new_hash_slots[j] = index;
-  }
-
-  hash_table_ = new_hash_table;
-  hash_slots_ = reinterpret_cast<int32_t*>(hash_table_->mutable_data());
-  hash_table_size_ = new_size;
-  mod_bitmask_ = new_size - 1;
-
-  return Status::OK();
-}
-
-template <typename T>
-typename DictionaryBuilder<T>::Scalar DictionaryBuilder<T>::GetDictionaryValue(
-    int64_t index) {
-  const Scalar* data = reinterpret_cast<const Scalar*>(dict_builder_.data()->data());
-  return data[index];
-}
-
-template <>
-const uint8_t* DictionaryBuilder<FixedSizeBinaryType>::GetDictionaryValue(int64_t index) {
-  return dict_builder_.GetValue(index);
-}
-
-template <typename T>
-int DictionaryBuilder<T>::HashValue(const Scalar& value) {
-  return HashUtil::Hash(&value, sizeof(Scalar), 0);
-}
-
-template <>
-int DictionaryBuilder<FixedSizeBinaryType>::HashValue(const Scalar& value) {
-  return HashUtil::Hash(value, byte_width_, 0);
-}
-
-template <typename T>
-bool DictionaryBuilder<T>::SlotDifferent(hash_slot_t index, const Scalar& value) {
-  const Scalar other = GetDictionaryValue(static_cast<int64_t>(index));
-  return other != value;
-}
-
-template <>
-bool DictionaryBuilder<FixedSizeBinaryType>::SlotDifferent(hash_slot_t index,
-                                                           const Scalar& value) {
-  int32_t width = static_cast<const FixedSizeBinaryType&>(*type_).byte_width();
-  const Scalar other = GetDictionaryValue(static_cast<int64_t>(index));
-  return memcmp(other, value, width) != 0;
-}
-
-template <typename T>
-Status DictionaryBuilder<T>::AppendDictionary(const Scalar& value) {
-  return dict_builder_.Append(value);
-}
-
-#define BINARY_DICTIONARY_SPECIALIZATIONS(Type)                                     \
-  template <>                                                                       \
-  WrappedBinary DictionaryBuilder<Type>::GetDictionaryValue(int64_t index) {        \
-    int32_t v_len;                                                                  \
-    const uint8_t* v = dict_builder_.GetValue(static_cast<int64_t>(index), &v_len); \
-    return WrappedBinary(v, v_len);                                                 \
-  }                                                                                 \
-                                                                                    \
-  template <>                                                                       \
-  Status DictionaryBuilder<Type>::AppendDictionary(const WrappedBinary& value) {    \
-    return dict_builder_.Append(value.ptr_, value.length_);                         \
-  }                                                                                 \
-                                                                                    \
-  template <>                                                                       \
-  Status DictionaryBuilder<Type>::AppendArray(const Array& array) {                 \
-    const BinaryArray& binary_array = static_cast<const BinaryArray&>(array);       \
-    WrappedBinary value(nullptr, 0);                                                \
-    for (int64_t i = 0; i < array.length(); i++) {                                  \
-      if (array.IsNull(i)) {                                                        \
-        RETURN_NOT_OK(AppendNull());                                                \
-      } else {                                                                      \
-        value.ptr_ = binary_array.GetValue(i, &value.length_);                      \
-        RETURN_NOT_OK(Append(value));                                               \
-      }                                                                             \
-    }                                                                               \
-    return Status::OK();                                                            \
-  }                                                                                 \
-                                                                                    \
-  template <>                                                                       \
-  int DictionaryBuilder<Type>::HashValue(const WrappedBinary& value) {              \
-    return HashUtil::Hash(value.ptr_, value.length_, 0);                            \
-  }                                                                                 \
-                                                                                    \
-  template <>                                                                       \
-  bool DictionaryBuilder<Type>::SlotDifferent(hash_slot_t index,                    \
-                                              const WrappedBinary& value) {         \
-    int32_t other_length;                                                           \
-    const uint8_t* other_value =                                                    \
-        dict_builder_.GetValue(static_cast<int64_t>(index), &other_length);         \
-    return !(other_length == value.length_ &&                                       \
-             0 == memcmp(other_value, value.ptr_, value.length_));                  \
-  }
-
-BINARY_DICTIONARY_SPECIALIZATIONS(StringType);
-BINARY_DICTIONARY_SPECIALIZATIONS(BinaryType);
-
-template class DictionaryBuilder<UInt8Type>;
-template class DictionaryBuilder<UInt16Type>;
-template class DictionaryBuilder<UInt32Type>;
-template class DictionaryBuilder<UInt64Type>;
-template class DictionaryBuilder<Int8Type>;
-template class DictionaryBuilder<Int16Type>;
-template class DictionaryBuilder<Int32Type>;
-template class DictionaryBuilder<Int64Type>;
-template class DictionaryBuilder<Date32Type>;
-template class DictionaryBuilder<Date64Type>;
-template class DictionaryBuilder<Time32Type>;
-template class DictionaryBuilder<Time64Type>;
-template class DictionaryBuilder<TimestampType>;
-template class DictionaryBuilder<FloatType>;
-template class DictionaryBuilder<DoubleType>;
-template class DictionaryBuilder<FixedSizeBinaryType>;
-template class DictionaryBuilder<BinaryType>;
-template class DictionaryBuilder<StringType>;
-
-// ----------------------------------------------------------------------
 // Decimal128Builder
 
 Decimal128Builder::Decimal128Builder(const std::shared_ptr<DataType>& type,
@@ -1446,7 +1113,7 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
       BUILDER_CASE(STRING, StringBuilder);
       BUILDER_CASE(BINARY, BinaryBuilder);
       BUILDER_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryBuilder);
-      BUILDER_CASE(DECIMAL, DecimalBuilder);
+      BUILDER_CASE(DECIMAL, Decimal128Builder);
     case Type::LIST: {
       std::unique_ptr<ArrayBuilder> value_builder;
       std::shared_ptr<DataType> value_type =
@@ -1477,125 +1144,4 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
   }
 }
 
-#define DICTIONARY_BUILDER_CASE(ENUM, BuilderType) \
-  case Type::ENUM:                                 \
-    out->reset(new BuilderType(type, pool));       \
-    return Status::OK();
-
-Status MakeDictionaryBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
-                             std::shared_ptr<ArrayBuilder>* out) {
-  switch (type->id()) {
-    DICTIONARY_BUILDER_CASE(NA, DictionaryBuilder<NullType>);
-    DICTIONARY_BUILDER_CASE(UINT8, DictionaryBuilder<UInt8Type>);
-    DICTIONARY_BUILDER_CASE(INT8, DictionaryBuilder<Int8Type>);
-    DICTIONARY_BUILDER_CASE(UINT16, DictionaryBuilder<UInt16Type>);
-    DICTIONARY_BUILDER_CASE(INT16, DictionaryBuilder<Int16Type>);
-    DICTIONARY_BUILDER_CASE(UINT32, DictionaryBuilder<UInt32Type>);
-    DICTIONARY_BUILDER_CASE(INT32, DictionaryBuilder<Int32Type>);
-    DICTIONARY_BUILDER_CASE(UINT64, DictionaryBuilder<UInt64Type>);
-    DICTIONARY_BUILDER_CASE(INT64, DictionaryBuilder<Int64Type>);
-    DICTIONARY_BUILDER_CASE(DATE32, DictionaryBuilder<Date32Type>);
-    DICTIONARY_BUILDER_CASE(DATE64, DictionaryBuilder<Date64Type>);
-    DICTIONARY_BUILDER_CASE(TIME32, DictionaryBuilder<Time32Type>);
-    DICTIONARY_BUILDER_CASE(TIME64, DictionaryBuilder<Time64Type>);
-    DICTIONARY_BUILDER_CASE(TIMESTAMP, DictionaryBuilder<TimestampType>);
-    DICTIONARY_BUILDER_CASE(FLOAT, DictionaryBuilder<FloatType>);
-    DICTIONARY_BUILDER_CASE(DOUBLE, DictionaryBuilder<DoubleType>);
-    DICTIONARY_BUILDER_CASE(STRING, StringDictionaryBuilder);
-    DICTIONARY_BUILDER_CASE(BINARY, BinaryDictionaryBuilder);
-    DICTIONARY_BUILDER_CASE(FIXED_SIZE_BINARY, DictionaryBuilder<FixedSizeBinaryType>);
-    DICTIONARY_BUILDER_CASE(DECIMAL, DictionaryBuilder<FixedSizeBinaryType>);
-    default:
-      return Status::NotImplemented(type->ToString());
-  }
-}
-
-#define DICTIONARY_ARRAY_CASE(ENUM, BuilderType)                           \
-  case Type::ENUM:                                                         \
-    builder = std::make_shared<BuilderType>(type, pool);                   \
-    RETURN_NOT_OK(static_cast<BuilderType&>(*builder).AppendArray(input)); \
-    RETURN_NOT_OK(builder->Finish(out));                                   \
-    return Status::OK();
-
-Status EncodeArrayToDictionary(const Array& input, MemoryPool* pool,
-                               std::shared_ptr<Array>* out) {
-  const std::shared_ptr<DataType>& type = input.data()->type;
-  std::shared_ptr<ArrayBuilder> builder;
-  switch (type->id()) {
-    DICTIONARY_ARRAY_CASE(NA, DictionaryBuilder<NullType>);
-    DICTIONARY_ARRAY_CASE(UINT8, DictionaryBuilder<UInt8Type>);
-    DICTIONARY_ARRAY_CASE(INT8, DictionaryBuilder<Int8Type>);
-    DICTIONARY_ARRAY_CASE(UINT16, DictionaryBuilder<UInt16Type>);
-    DICTIONARY_ARRAY_CASE(INT16, DictionaryBuilder<Int16Type>);
-    DICTIONARY_ARRAY_CASE(UINT32, DictionaryBuilder<UInt32Type>);
-    DICTIONARY_ARRAY_CASE(INT32, DictionaryBuilder<Int32Type>);
-    DICTIONARY_ARRAY_CASE(UINT64, DictionaryBuilder<UInt64Type>);
-    DICTIONARY_ARRAY_CASE(INT64, DictionaryBuilder<Int64Type>);
-    DICTIONARY_ARRAY_CASE(DATE32, DictionaryBuilder<Date32Type>);
-    DICTIONARY_ARRAY_CASE(DATE64, DictionaryBuilder<Date64Type>);
-    DICTIONARY_ARRAY_CASE(TIME32, DictionaryBuilder<Time32Type>);
-    DICTIONARY_ARRAY_CASE(TIME64, DictionaryBuilder<Time64Type>);
-    DICTIONARY_ARRAY_CASE(TIMESTAMP, DictionaryBuilder<TimestampType>);
-    DICTIONARY_ARRAY_CASE(FLOAT, DictionaryBuilder<FloatType>);
-    DICTIONARY_ARRAY_CASE(DOUBLE, DictionaryBuilder<DoubleType>);
-    DICTIONARY_ARRAY_CASE(STRING, StringDictionaryBuilder);
-    DICTIONARY_ARRAY_CASE(BINARY, BinaryDictionaryBuilder);
-    DICTIONARY_ARRAY_CASE(FIXED_SIZE_BINARY, DictionaryBuilder<FixedSizeBinaryType>);
-    DICTIONARY_ARRAY_CASE(DECIMAL, DictionaryBuilder<FixedSizeBinaryType>);
-    default:
-      std::stringstream ss;
-      ss << "Cannot encode array of type " << type->ToString();
-      ss << " to dictionary";
-      return Status::NotImplemented(ss.str());
-  }
-}
-#define DICTIONARY_COLUMN_CASE(ENUM, BuilderType)                             \
-  case Type::ENUM:                                                            \
-    builder = std::make_shared<BuilderType>(type, pool);                      \
-    chunks = input.data();                                                    \
-    for (auto chunk : chunks->chunks()) {                                     \
-      RETURN_NOT_OK(static_cast<BuilderType&>(*builder).AppendArray(*chunk)); \
-    }                                                                         \
-    RETURN_NOT_OK(builder->Finish(&arr));                                     \
-    *out = std::make_shared<Column>(input.name(), arr);                       \
-    return Status::OK();
-
-/// \brief Encodes a column to a suitable dictionary type
-/// \param input Column to be encoded
-/// \param pool MemoryPool to allocate the dictionary
-/// \param out The new column
-/// \return Status
-Status EncodeColumnToDictionary(const Column& input, MemoryPool* pool,
-                                std::shared_ptr<Column>* out) {
-  const std::shared_ptr<DataType>& type = input.type();
-  std::shared_ptr<ArrayBuilder> builder;
-  std::shared_ptr<Array> arr;
-  std::shared_ptr<ChunkedArray> chunks;
-  switch (type->id()) {
-    DICTIONARY_COLUMN_CASE(UINT8, DictionaryBuilder<UInt8Type>);
-    DICTIONARY_COLUMN_CASE(INT8, DictionaryBuilder<Int8Type>);
-    DICTIONARY_COLUMN_CASE(UINT16, DictionaryBuilder<UInt16Type>);
-    DICTIONARY_COLUMN_CASE(INT16, DictionaryBuilder<Int16Type>);
-    DICTIONARY_COLUMN_CASE(UINT32, DictionaryBuilder<UInt32Type>);
-    DICTIONARY_COLUMN_CASE(INT32, DictionaryBuilder<Int32Type>);
-    DICTIONARY_COLUMN_CASE(UINT64, DictionaryBuilder<UInt64Type>);
-    DICTIONARY_COLUMN_CASE(INT64, DictionaryBuilder<Int64Type>);
-    DICTIONARY_COLUMN_CASE(DATE32, DictionaryBuilder<Date32Type>);
-    DICTIONARY_COLUMN_CASE(DATE64, DictionaryBuilder<Date64Type>);
-    DICTIONARY_COLUMN_CASE(TIME32, DictionaryBuilder<Time32Type>);
-    DICTIONARY_COLUMN_CASE(TIME64, DictionaryBuilder<Time64Type>);
-    DICTIONARY_COLUMN_CASE(TIMESTAMP, DictionaryBuilder<TimestampType>);
-    DICTIONARY_COLUMN_CASE(FLOAT, DictionaryBuilder<FloatType>);
-    DICTIONARY_COLUMN_CASE(DOUBLE, DictionaryBuilder<DoubleType>);
-    DICTIONARY_COLUMN_CASE(STRING, StringDictionaryBuilder);
-    DICTIONARY_COLUMN_CASE(BINARY, BinaryDictionaryBuilder);
-    DICTIONARY_COLUMN_CASE(FIXED_SIZE_BINARY, DictionaryBuilder<FixedSizeBinaryType>);
-    default:
-      std::stringstream ss;
-      ss << "Cannot encode column of type " << type->ToString();
-      ss << " to dictionary";
-      return Status::NotImplemented(ss.str());
-  }
-}
-
 }  // namespace arrow
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index bc25d0d..32741b5 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -123,6 +123,18 @@ class ARROW_EXPORT ArrayBuilder {
 
   std::shared_ptr<DataType> type() const { return type_; }
 
+  // Unsafe operations (don't check capacity/don't resize)
+
+  // Append to null bitmap.
+  void UnsafeAppendToBitmap(bool is_valid) {
+    if (is_valid) {
+      BitUtil::SetBit(null_bitmap_data_, length_);
+    } else {
+      ++null_count_;
+    }
+    ++length_;
+  }
+
  protected:
   ArrayBuilder() {}
 
@@ -143,18 +155,6 @@ class ARROW_EXPORT ArrayBuilder {
 
   void Reset();
 
-  // Unsafe operations (don't check capacity/don't resize)
-
-  // Append to null bitmap.
-  void UnsafeAppendToBitmap(bool is_valid) {
-    if (is_valid) {
-      BitUtil::SetBit(null_bitmap_data_, length_);
-    } else {
-      ++null_count_;
-    }
-    ++length_;
-  }
-
   // Vector append. Treat each zero byte as a nullzero. If valid_bytes is null
   // assume all of length bits are valid.
   void UnsafeAppendToBitmap(const uint8_t* valid_bytes, int64_t length);
@@ -811,190 +811,11 @@ class ARROW_EXPORT StructBuilder : public ArrayBuilder {
 };
 
 // ----------------------------------------------------------------------
-// Dictionary builder
-
-// Based on Apache Parquet-cpp's DictEncoder
-
-// Initially 1024 elements
-static constexpr int kInitialHashTableSize = 1 << 10;
-
-typedef int32_t hash_slot_t;
-static constexpr hash_slot_t kHashSlotEmpty = std::numeric_limits<int32_t>::max();
-
-// The maximum load factor for the hash table before resizing.
-static constexpr double kMaxHashTableLoad = 0.7;
-
-namespace internal {
-
-// TODO(ARROW-1176): Use Tensorflow's StringPiece instead of this here.
-struct WrappedBinary {
-  WrappedBinary(const uint8_t* ptr, int32_t length) : ptr_(ptr), length_(length) {}
-
-  const uint8_t* ptr_;
-  int32_t length_;
-};
-
-template <typename T>
-struct DictionaryScalar {
-  using type = typename T::c_type;
-};
-
-template <>
-struct DictionaryScalar<BinaryType> {
-  using type = WrappedBinary;
-};
-
-template <>
-struct DictionaryScalar<StringType> {
-  using type = WrappedBinary;
-};
-
-template <>
-struct DictionaryScalar<FixedSizeBinaryType> {
-  using type = uint8_t const*;
-};
-
-}  // namespace internal
-
-/// \brief Array builder for created encoded DictionaryArray from dense array
-/// data
-template <typename T>
-class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder {
- public:
-  using Scalar = typename internal::DictionaryScalar<T>::type;
-
-  ~DictionaryBuilder() {}
-
-  DictionaryBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
-
-  template <typename T1 = T>
-  explicit DictionaryBuilder(
-      typename std::enable_if<TypeTraits<T1>::is_parameter_free, MemoryPool*>::type pool)
-      : DictionaryBuilder<T1>(TypeTraits<T1>::type_singleton(), pool) {}
-
-  /// \brief Append a scalar value
-  Status Append(const Scalar& value);
-
-  /// \brief Append a scalar null value
-  Status AppendNull();
-
-  /// \brief Append a whole dense array to the builder
-  Status AppendArray(const Array& array);
-
-  Status Init(int64_t elements) override;
-  Status Resize(int64_t capacity) override;
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-
- protected:
-  Status DoubleTableSize();
-  Scalar GetDictionaryValue(int64_t index);
-  int HashValue(const Scalar& value);
-  bool SlotDifferent(hash_slot_t slot, const Scalar& value);
-  Status AppendDictionary(const Scalar& value);
-
-  std::shared_ptr<PoolBuffer> hash_table_;
-  int32_t* hash_slots_;
-
-  /// Size of the table. Must be a power of 2.
-  int hash_table_size_;
-
-  // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j %
-  // hash_table_size_, but uses far fewer CPU cycles
-  int mod_bitmask_;
-
-  typename TypeTraits<T>::BuilderType dict_builder_;
-  AdaptiveIntBuilder values_builder_;
-  int32_t byte_width_;
-};
-
-template <>
-class ARROW_EXPORT DictionaryBuilder<NullType> : public ArrayBuilder {
- public:
-  ~DictionaryBuilder();
-
-  DictionaryBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
-  explicit DictionaryBuilder(MemoryPool* pool);
-
-  /// \brief Append a scalar null value
-  Status AppendNull();
-
-  /// \brief Append a whole dense array to the builder
-  Status AppendArray(const Array& array);
-
-  Status Init(int64_t elements) override;
-  Status Resize(int64_t capacity) override;
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-
- protected:
-  AdaptiveIntBuilder values_builder_;
-};
-
-class ARROW_EXPORT BinaryDictionaryBuilder : public DictionaryBuilder<BinaryType> {
- public:
-  using DictionaryBuilder::Append;
-  using DictionaryBuilder::DictionaryBuilder;
-
-  Status Append(const uint8_t* value, int32_t length) {
-    return Append(internal::WrappedBinary(value, length));
-  }
-
-  Status Append(const char* value, int32_t length) {
-    return Append(
-        internal::WrappedBinary(reinterpret_cast<const uint8_t*>(value), length));
-  }
-
-  Status Append(const std::string& value) {
-    return Append(internal::WrappedBinary(reinterpret_cast<const uint8_t*>(value.c_str()),
-                                          static_cast<int32_t>(value.size())));
-  }
-};
-
-/// \brief Dictionary array builder with convenience methods for strings
-class ARROW_EXPORT StringDictionaryBuilder : public DictionaryBuilder<StringType> {
- public:
-  using DictionaryBuilder::Append;
-  using DictionaryBuilder::DictionaryBuilder;
-
-  Status Append(const uint8_t* value, int32_t length) {
-    return Append(internal::WrappedBinary(value, length));
-  }
-
-  Status Append(const char* value, int32_t length) {
-    return Append(
-        internal::WrappedBinary(reinterpret_cast<const uint8_t*>(value), length));
-  }
-
-  Status Append(const std::string& value) {
-    return Append(internal::WrappedBinary(reinterpret_cast<const uint8_t*>(value.c_str()),
-                                          static_cast<int32_t>(value.size())));
-  }
-};
-
-// ----------------------------------------------------------------------
 // Helper functions
 
 Status ARROW_EXPORT MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
                                 std::unique_ptr<ArrayBuilder>* out);
 
-Status ARROW_EXPORT MakeDictionaryBuilder(MemoryPool* pool,
-                                          const std::shared_ptr<DataType>& type,
-                                          std::shared_ptr<ArrayBuilder>* out);
-
-/// \brief Convert Array to encoded DictionaryArray form
-///
-/// \param[in] input The Array to be encoded
-/// \param[in] pool MemoryPool to allocate memory for the hash table
-/// \param[out] out Array encoded to DictionaryArray
-Status ARROW_EXPORT EncodeArrayToDictionary(const Array& input, MemoryPool* pool,
-                                            std::shared_ptr<Array>* out);
-
-/// \brief Convert a Column's data internally to DictionaryArray
-///
-/// \param[in] input The ChunkedArray to be encoded
-/// \param[in] pool MemoryPool to allocate memory for the hash table
-/// \param[out] out Column with data converted to DictionaryArray
-Status ARROW_EXPORT EncodeColumnToDictionary(const Column& input, MemoryPool* pool,
-                                             std::shared_ptr<Column>* out);
 }  // namespace arrow
 
 #endif  // ARROW_BUILDER_H_
diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt
index 4589afb..d4369ed 100644
--- a/cpp/src/arrow/compute/CMakeLists.txt
+++ b/cpp/src/arrow/compute/CMakeLists.txt
@@ -18,7 +18,6 @@
 # Headers: top level
 install(FILES
   api.h
-  cast.h
   context.h
   kernel.h
   DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/compute")
@@ -36,3 +35,6 @@ install(
 #######################################
 
 ADD_ARROW_TEST(compute-test)
+ADD_ARROW_BENCHMARK(compute-benchmark)
+
+add_subdirectory(kernels)
diff --git a/cpp/src/arrow/compute/api.h b/cpp/src/arrow/compute/api.h
index da7df1c..b3700b4 100644
--- a/cpp/src/arrow/compute/api.h
+++ b/cpp/src/arrow/compute/api.h
@@ -18,8 +18,10 @@
 #ifndef ARROW_COMPUTE_API_H
 #define ARROW_COMPUTE_API_H
 
-#include "arrow/compute/cast.h"
 #include "arrow/compute/context.h"
 #include "arrow/compute/kernel.h"
 
+#include "arrow/compute/kernels/cast.h"
+#include "arrow/compute/kernels/hash.h"
+
 #endif  // ARROW_COMPUTE_API_H
diff --git a/cpp/src/arrow/compute/compute-benchmark.cc b/cpp/src/arrow/compute/compute-benchmark.cc
new file mode 100644
index 0000000..974fffc
--- /dev/null
+++ b/cpp/src/arrow/compute/compute-benchmark.cc
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <vector>
+
+#include "arrow/builder.h"
+#include "arrow/memory_pool.h"
+#include "arrow/test-util.h"
+
+#include "arrow/compute/context.h"
+#include "arrow/compute/kernels/hash.h"
+
+namespace arrow {
+namespace compute {
+
+static void BM_BuildDictionary(benchmark::State& state) {  // NOLINT non-const reference
+  const int64_t iterations = 1024;
+
+  std::vector<int64_t> values;
+  std::vector<bool> is_valid;
+  for (int64_t i = 0; i < iterations; i++) {
+    for (int64_t j = 0; j < i; j++) {
+      is_valid.push_back((i + j) % 9 == 0);
+      values.push_back(j);
+    }
+  }
+
+  std::shared_ptr<Array> arr;
+  ArrayFromVector<Int64Type, int64_t>(is_valid, values, &arr);
+
+  FunctionContext ctx;
+
+  while (state.KeepRunning()) {
+    Datum out;
+    ABORT_NOT_OK(DictionaryEncode(&ctx, Datum(arr), &out));
+  }
+  state.SetBytesProcessed(state.iterations() * values.size() * sizeof(int64_t));
+}
+
+static void BM_BuildStringDictionary(
+    benchmark::State& state) {  // NOLINT non-const reference
+  const int64_t iterations = 1024 * 64;
+  // Pre-render strings
+  std::vector<std::string> data;
+
+  int64_t total_bytes = 0;
+  for (int64_t i = 0; i < iterations; i++) {
+    std::stringstream ss;
+    ss << i;
+    auto val = ss.str();
+    data.push_back(val);
+    total_bytes += static_cast<int64_t>(val.size());
+  }
+
+  std::shared_ptr<Array> arr;
+  ArrayFromVector<StringType, std::string>(data, &arr);
+
+  FunctionContext ctx;
+
+  while (state.KeepRunning()) {
+    Datum out;
+    ABORT_NOT_OK(DictionaryEncode(&ctx, Datum(arr), &out));
+  }
+  // Assuming a string here needs on average 2 bytes
+  state.SetBytesProcessed(state.iterations() * total_bytes);
+}
+
+BENCHMARK(BM_BuildDictionary)->Repetitions(3)->Unit(benchmark::kMicrosecond);
+BENCHMARK(BM_BuildStringDictionary)->Repetitions(3)->Unit(benchmark::kMicrosecond);
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/compute-test.cc b/cpp/src/arrow/compute/compute-test.cc
index 61d53c4..5eada39 100644
--- a/cpp/src/arrow/compute/compute-test.cc
+++ b/cpp/src/arrow/compute/compute-test.cc
@@ -37,10 +37,12 @@
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 
-#include "arrow/compute/cast.h"
 #include "arrow/compute/context.h"
 #include "arrow/compute/kernel.h"
+#include "arrow/compute/kernels/cast.h"
+#include "arrow/compute/kernels/hash.h"
 
+using std::shared_ptr;
 using std::vector;
 
 namespace arrow {
@@ -54,6 +56,18 @@ class ComputeFixture {
   FunctionContext ctx_;
 };
 
+template <typename Type, typename T>
+shared_ptr<Array> _MakeArray(const shared_ptr<DataType>& type, const vector<T>& values,
+                             const vector<bool>& is_valid) {
+  shared_ptr<Array> result;
+  if (is_valid.size() > 0) {
+    ArrayFromVector<Type, T>(type, is_valid, values, &result);
+  } else {
+    ArrayFromVector<Type, T>(type, values, &result);
+  }
+  return result;
+}
+
 // ----------------------------------------------------------------------
 // Cast
 
@@ -65,17 +79,17 @@ static void AssertBufferSame(const Array& left, const Array& right, int buffer_i
 class TestCast : public ComputeFixture, public TestBase {
  public:
   void CheckPass(const Array& input, const Array& expected,
-                 const std::shared_ptr<DataType>& out_type, const CastOptions& options) {
-    std::shared_ptr<Array> result;
+                 const shared_ptr<DataType>& out_type, const CastOptions& options) {
+    shared_ptr<Array> result;
     ASSERT_OK(Cast(&ctx_, input, out_type, options, &result));
     ASSERT_ARRAYS_EQUAL(expected, *result);
   }
 
   template <typename InType, typename I_TYPE>
-  void CheckFails(const std::shared_ptr<DataType>& in_type,
-                  const std::vector<I_TYPE>& in_values, const std::vector<bool>& is_valid,
-                  const std::shared_ptr<DataType>& out_type, const CastOptions& options) {
-    std::shared_ptr<Array> input, result;
+  void CheckFails(const shared_ptr<DataType>& in_type, const vector<I_TYPE>& in_values,
+                  const vector<bool>& is_valid, const shared_ptr<DataType>& out_type,
+                  const CastOptions& options) {
+    shared_ptr<Array> input, result;
     if (is_valid.size() > 0) {
       ArrayFromVector<InType, I_TYPE>(in_type, is_valid, in_values, &input);
     } else {
@@ -84,19 +98,18 @@ class TestCast : public ComputeFixture, public TestBase {
     ASSERT_RAISES(Invalid, Cast(&ctx_, *input, out_type, options, &result));
   }
 
-  void CheckZeroCopy(const Array& input, const std::shared_ptr<DataType>& out_type) {
-    std::shared_ptr<Array> result;
+  void CheckZeroCopy(const Array& input, const shared_ptr<DataType>& out_type) {
+    shared_ptr<Array> result;
     ASSERT_OK(Cast(&ctx_, input, out_type, {}, &result));
     AssertBufferSame(input, *result, 0);
     AssertBufferSame(input, *result, 1);
   }
 
   template <typename InType, typename I_TYPE, typename OutType, typename O_TYPE>
-  void CheckCase(const std::shared_ptr<DataType>& in_type,
-                 const std::vector<I_TYPE>& in_values, const std::vector<bool>& is_valid,
-                 const std::shared_ptr<DataType>& out_type,
-                 const std::vector<O_TYPE>& out_values, const CastOptions& options) {
-    std::shared_ptr<Array> input, expected;
+  void CheckCase(const shared_ptr<DataType>& in_type, const vector<I_TYPE>& in_values,
+                 const vector<bool>& is_valid, const shared_ptr<DataType>& out_type,
+                 const vector<O_TYPE>& out_values, const CastOptions& options) {
+    shared_ptr<Array> input, expected;
     if (is_valid.size() > 0) {
       ArrayFromVector<InType, I_TYPE>(in_type, is_valid, in_values, &input);
       ArrayFromVector<OutType, O_TYPE>(out_type, is_valid, out_values, &expected);
@@ -117,10 +130,10 @@ TEST_F(TestCast, SameTypeZeroCopy) {
   vector<bool> is_valid = {true, false, true, true, true};
   vector<int32_t> v1 = {0, 1, 2, 3, 4};
 
-  std::shared_ptr<Array> arr;
+  shared_ptr<Array> arr;
   ArrayFromVector<Int32Type, int32_t>(int32(), is_valid, v1, &arr);
 
-  std::shared_ptr<Array> result;
+  shared_ptr<Array> result;
   ASSERT_OK(Cast(&this->ctx_, *arr, int32(), {}, &result));
 
   AssertBufferSame(*arr, *result, 0);
@@ -185,7 +198,7 @@ TEST_F(TestCast, OverflowInNullSlot) {
   vector<int32_t> v11 = {0, 70000, 2000, 1000, 0};
   vector<int16_t> e11 = {0, 0, 2000, 1000, 0};
 
-  std::shared_ptr<Array> expected;
+  shared_ptr<Array> expected;
   ArrayFromVector<Int16Type, int16_t>(int16(), is_valid, e11, &expected);
 
   auto buf = std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(v11.data()),
@@ -280,8 +293,8 @@ TEST_F(TestCast, TimestampToTimestamp) {
 
   auto CheckTimestampCast = [this](
       const CastOptions& options, TimeUnit::type from_unit, TimeUnit::type to_unit,
-      const std::vector<int64_t>& from_values, const std::vector<int64_t>& to_values,
-      const std::vector<bool>& is_valid) {
+      const vector<int64_t>& from_values, const vector<int64_t>& to_values,
+      const vector<bool>& is_valid) {
     CheckCase<TimestampType, int64_t, TimestampType, int64_t>(
         timestamp(from_unit), from_values, is_valid, timestamp(to_unit), to_values,
         options);
@@ -315,8 +328,8 @@ TEST_F(TestCast, TimestampToTimestamp) {
   CheckTimestampCast(options, TimeUnit::MICRO, TimeUnit::NANO, v6, e6, is_valid);
 
   // Zero copy
-  std::shared_ptr<Array> arr;
   vector<int64_t> v7 = {0, 70000, 2000, 1000, 0};
+  shared_ptr<Array> arr;
   ArrayFromVector<TimestampType, int64_t>(timestamp(TimeUnit::SECOND), is_valid, v7,
                                           &arr);
   CheckZeroCopy(*arr, timestamp(TimeUnit::SECOND));
@@ -456,8 +469,8 @@ TEST_F(TestCast, TimeToTime) {
       time64(TimeUnit::MICRO), v6, is_valid, time64(TimeUnit::NANO), e6, options);
 
   // Zero copy
-  std::shared_ptr<Array> arr;
   vector<int64_t> v7 = {0, 70000, 2000, 1000, 0};
+  shared_ptr<Array> arr;
   ArrayFromVector<Time64Type, int64_t>(time64(TimeUnit::MICRO), is_valid, v7, &arr);
   CheckZeroCopy(*arr, time64(TimeUnit::MICRO));
 
@@ -516,9 +529,9 @@ TEST_F(TestCast, DateToDate) {
                                                       e1, options);
 
   // Zero copy
-  std::shared_ptr<Array> arr;
   vector<int32_t> v2 = {0, 70000, 2000, 1000, 0};
   vector<int64_t> v3 = {0, 70000, 2000, 1000, 0};
+  shared_ptr<Array> arr;
   ArrayFromVector<Date32Type, int32_t>(date32(), is_valid, v2, &arr);
   CheckZeroCopy(*arr, date32());
 
@@ -561,22 +574,54 @@ TEST_F(TestCast, ToDouble) {
                                                    options);
 }
 
+TEST_F(TestCast, ChunkedArray) {
+  vector<int16_t> values1 = {0, 1, 2};
+  vector<int16_t> values2 = {3, 4, 5};
+
+  auto type = int16();
+  auto out_type = int64();
+
+  auto a1 = _MakeArray<Int16Type, int16_t>(type, values1, {});
+  auto a2 = _MakeArray<Int16Type, int16_t>(type, values2, {});
+
+  ArrayVector arrays = {a1, a2};
+  auto carr = std::make_shared<ChunkedArray>(arrays);
+
+  CastOptions options;
+
+  Datum out;
+  ASSERT_OK(Cast(&this->ctx_, Datum(carr), out_type, options, &out));
+  ASSERT_EQ(Datum::CHUNKED_ARRAY, out.kind());
+
+  auto out_carr = out.chunked_array();
+
+  vector<int64_t> ex_values1 = {0, 1, 2};
+  vector<int64_t> ex_values2 = {3, 4, 5};
+  auto a3 = _MakeArray<Int64Type, int64_t>(out_type, ex_values1, {});
+  auto a4 = _MakeArray<Int64Type, int64_t>(out_type, ex_values2, {});
+
+  ArrayVector ex_arrays = {a3, a4};
+  auto ex_carr = std::make_shared<ChunkedArray>(ex_arrays);
+
+  ASSERT_TRUE(out.chunked_array()->Equals(*ex_carr));
+}
+
 TEST_F(TestCast, UnsupportedTarget) {
   vector<bool> is_valid = {true, false, true, true, true};
   vector<int32_t> v1 = {0, 1, 2, 3, 4};
 
-  std::shared_ptr<Array> arr;
+  shared_ptr<Array> arr;
   ArrayFromVector<Int32Type, int32_t>(int32(), is_valid, v1, &arr);
 
-  std::shared_ptr<Array> result;
+  shared_ptr<Array> result;
   ASSERT_RAISES(NotImplemented, Cast(&this->ctx_, *arr, utf8(), {}, &result));
 }
 
 TEST_F(TestCast, DateTimeZeroCopy) {
   vector<bool> is_valid = {true, false, true, true, true};
 
-  std::shared_ptr<Array> arr;
   vector<int32_t> v1 = {0, 70000, 2000, 1000, 0};
+  shared_ptr<Array> arr;
   ArrayFromVector<Int32Type, int32_t>(int32(), is_valid, v1, &arr);
 
   CheckZeroCopy(*arr, time32(TimeUnit::SECOND));
@@ -596,7 +641,7 @@ TEST_F(TestCast, FromNull) {
 
   NullArray arr(length);
 
-  std::shared_ptr<Array> result;
+  shared_ptr<Array> result;
   ASSERT_OK(Cast(&ctx_, arr, int32(), {}, &result));
 
   ASSERT_EQ(length, result->length());
@@ -614,7 +659,7 @@ TEST_F(TestCast, PreallocatedMemory) {
 
   const int64_t length = 5;
 
-  std::shared_ptr<Array> arr;
+  shared_ptr<Array> arr;
   vector<int32_t> v1 = {0, 70000, 2000, 1000, 0};
   vector<int64_t> e1 = {0, 70000, 2000, 1000, 0};
   ArrayFromVector<Int32Type, int32_t>(int32(), is_valid, v1, &arr);
@@ -626,19 +671,20 @@ TEST_F(TestCast, PreallocatedMemory) {
 
   auto out_data = std::make_shared<ArrayData>(out_type, length);
 
-  std::shared_ptr<Buffer> out_values;
+  shared_ptr<Buffer> out_values;
   ASSERT_OK(this->ctx_.Allocate(length * sizeof(int64_t), &out_values));
 
   out_data->buffers.push_back(nullptr);
   out_data->buffers.push_back(out_values);
 
-  ASSERT_OK(kernel->Call(&this->ctx_, *arr, out_data.get()));
+  Datum out(out_data);
+  ASSERT_OK(kernel->Call(&this->ctx_, *arr->data(), &out));
 
   // Buffer address unchanged
   ASSERT_EQ(out_values.get(), out_data->buffers[1].get());
 
-  std::shared_ptr<Array> result = MakeArray(out_data);
-  std::shared_ptr<Array> expected;
+  shared_ptr<Array> result = MakeArray(out_data);
+  shared_ptr<Array> expected;
   ArrayFromVector<Int64Type, int64_t>(int64(), is_valid, e1, &expected);
 
   ASSERT_ARRAYS_EQUAL(*expected, *result);
@@ -656,13 +702,268 @@ TYPED_TEST_CASE(TestDictionaryCast, TestTypes);
 
 TYPED_TEST(TestDictionaryCast, Basic) {
   CastOptions options;
-  std::shared_ptr<Array> plain_array =
+  shared_ptr<Array> plain_array =
+      TestBase::MakeRandomArray<typename TypeTraits<TypeParam>::ArrayType>(10, 2);
+
+  Datum out;
+  ASSERT_OK(DictionaryEncode(&this->ctx_, Datum(plain_array->data()), &out));
+
+  this->CheckPass(*MakeArray(out.array()), *plain_array, plain_array->type(), options);
+}
+
+/*TYPED_TEST(TestDictionaryCast, Reverse) {
+  CastOptions options;
+  shared_ptr<Array> plain_array =
       TestBase::MakeRandomArray<typename TypeTraits<TypeParam>::ArrayType>(10, 2);
 
-  std::shared_ptr<Array> dict_array;
+  shared_ptr<Array> dict_array;
   ASSERT_OK(EncodeArrayToDictionary(*plain_array, this->pool_, &dict_array));
 
-  this->CheckPass(*dict_array, *plain_array, plain_array->type(), options);
+  this->CheckPass(*plain_array, *dict_array, dict_array->type(), options);
+}*/
+
+// ----------------------------------------------------------------------
+// Dictionary tests
+
+template <typename Type, typename T>
+void CheckUnique(FunctionContext* ctx, const shared_ptr<DataType>& type,
+                 const vector<T>& in_values, const vector<bool>& in_is_valid,
+                 const vector<T>& out_values, const vector<bool>& out_is_valid) {
+  shared_ptr<Array> input = _MakeArray<Type, T>(type, in_values, in_is_valid);
+  shared_ptr<Array> expected = _MakeArray<Type, T>(type, out_values, out_is_valid);
+
+  shared_ptr<Array> result;
+  ASSERT_OK(Unique(ctx, Datum(input), &result));
+  ASSERT_ARRAYS_EQUAL(*expected, *result);
+}
+
+template <typename Type, typename T>
+void CheckDictEncode(FunctionContext* ctx, const shared_ptr<DataType>& type,
+                     const vector<T>& in_values, const vector<bool>& in_is_valid,
+                     const vector<T>& out_values, const vector<bool>& out_is_valid,
+                     const vector<int32_t>& out_indices) {
+  shared_ptr<Array> input = _MakeArray<Type, T>(type, in_values, in_is_valid);
+  shared_ptr<Array> ex_dict = _MakeArray<Type, T>(type, out_values, out_is_valid);
+  shared_ptr<Array> ex_indices =
+      _MakeArray<Int32Type, int32_t>(int32(), out_indices, in_is_valid);
+
+  DictionaryArray expected(dictionary(int32(), ex_dict), ex_indices);
+
+  Datum datum_out;
+  ASSERT_OK(DictionaryEncode(ctx, Datum(input), &datum_out));
+  shared_ptr<Array> result = MakeArray(datum_out.array());
+
+  ASSERT_ARRAYS_EQUAL(expected, *result);
+}
+
+class TestHashKernel : public ComputeFixture, public TestBase {};
+
+template <typename Type>
+class TestHashKernelPrimitive : public ComputeFixture, public TestBase {};
+
+typedef ::testing::Types<Int8Type, UInt8Type, Int16Type, UInt16Type, Int32Type,
+                         UInt32Type, Int64Type, UInt64Type, FloatType, DoubleType,
+                         Date32Type, Date64Type>
+    PrimitiveDictionaries;
+
+TYPED_TEST_CASE(TestHashKernelPrimitive, PrimitiveDictionaries);
+
+TYPED_TEST(TestHashKernelPrimitive, Unique) {
+  using T = typename TypeParam::c_type;
+  auto type = TypeTraits<TypeParam>::type_singleton();
+  CheckUnique<TypeParam, T>(&this->ctx_, type, {2, 1, 2, 1}, {true, false, true, true},
+                            {2, 1}, {});
+}
+
+TYPED_TEST(TestHashKernelPrimitive, DictEncode) {
+  using T = typename TypeParam::c_type;
+  auto type = TypeTraits<TypeParam>::type_singleton();
+  CheckDictEncode<TypeParam, T>(&this->ctx_, type, {2, 1, 2, 1, 2, 3},
+                                {true, false, true, true, true, true}, {2, 1, 3}, {},
+                                {0, 0, 0, 1, 0, 2});
+}
+
+TYPED_TEST(TestHashKernelPrimitive, PrimitiveResizeTable) {
+  using T = typename TypeParam::c_type;
+  // Skip this test for (u)int8
+  if (sizeof(Scalar) == 1) {
+    return;
+  }
+
+  const int64_t kTotalValues = 10000;
+  const int64_t kRepeats = 10;
+
+  vector<T> values;
+  vector<T> uniques;
+  vector<int32_t> indices;
+  for (int64_t i = 0; i < kTotalValues * kRepeats; i++) {
+    const auto val = static_cast<T>(i % kTotalValues);
+    values.push_back(val);
+
+    if (i < kTotalValues) {
+      uniques.push_back(val);
+    }
+    indices.push_back(static_cast<int32_t>(i % kTotalValues));
+  }
+
+  auto type = TypeTraits<TypeParam>::type_singleton();
+  CheckUnique<TypeParam, T>(&this->ctx_, type, values, {}, uniques, {});
+
+  CheckDictEncode<TypeParam, T>(&this->ctx_, type, values, {}, uniques, {}, indices);
+}
+
+TEST_F(TestHashKernel, UniqueTimeTimestamp) {
+  CheckUnique<Time32Type, int32_t>(&this->ctx_, time32(TimeUnit::SECOND), {2, 1, 2, 1},
+                                   {true, false, true, true}, {2, 1}, {});
+
+  CheckUnique<Time64Type, int64_t>(&this->ctx_, time64(TimeUnit::NANO), {2, 1, 2, 1},
+                                   {true, false, true, true}, {2, 1}, {});
+
+  CheckUnique<TimestampType, int64_t>(&this->ctx_, timestamp(TimeUnit::NANO),
+                                      {2, 1, 2, 1}, {true, false, true, true}, {2, 1},
+                                      {});
+}
+
+TEST_F(TestHashKernel, UniqueBinary) {
+  CheckUnique<BinaryType, std::string>(&this->ctx_, binary(),
+                                       {"test", "", "test2", "test"},
+                                       {true, false, true, true}, {"test", "test2"}, {});
+
+  CheckUnique<StringType, std::string>(&this->ctx_, utf8(), {"test", "", "test2", "test"},
+                                       {true, false, true, true}, {"test", "test2"}, {});
+}
+
+TEST_F(TestHashKernel, DictEncodeBinary) {
+  CheckDictEncode<BinaryType, std::string>(
+      &this->ctx_, binary(), {"test", "", "test2", "test", "baz"},
+      {true, false, true, true, true}, {"test", "test2", "baz"}, {}, {0, 0, 1, 0, 2});
+
+  CheckDictEncode<StringType, std::string>(
+      &this->ctx_, utf8(), {"test", "", "test2", "test", "baz"},
+      {true, false, true, true, true}, {"test", "test2", "baz"}, {}, {0, 0, 1, 0, 2});
+}
+
+TEST_F(TestHashKernel, BinaryResizeTable) {
+  const int64_t kTotalValues = 10000;
+  const int64_t kRepeats = 10;
+
+  vector<std::string> values;
+  vector<std::string> uniques;
+  vector<int32_t> indices;
+  for (int64_t i = 0; i < kTotalValues * kRepeats; i++) {
+    int64_t index = i % kTotalValues;
+    std::stringstream ss;
+    ss << "test" << index;
+    std::string val = ss.str();
+
+    values.push_back(val);
+
+    if (i < kTotalValues) {
+      uniques.push_back(val);
+    }
+    indices.push_back(static_cast<int32_t>(i % kTotalValues));
+  }
+
+  CheckUnique<BinaryType, std::string>(&this->ctx_, binary(), values, {}, uniques, {});
+  CheckDictEncode<BinaryType, std::string>(&this->ctx_, binary(), values, {}, uniques, {},
+                                           indices);
+
+  CheckUnique<StringType, std::string>(&this->ctx_, utf8(), values, {}, uniques, {});
+  CheckDictEncode<StringType, std::string>(&this->ctx_, utf8(), values, {}, uniques, {},
+                                           indices);
+}
+
+TEST_F(TestHashKernel, UniqueFixedSizeBinary) {
+  CheckUnique<FixedSizeBinaryType, std::string>(
+      &this->ctx_, fixed_size_binary(5), {"aaaaa", "", "bbbbb", "aaaaa"},
+      {true, false, true, true}, {"aaaaa", "bbbbb"}, {});
+}
+
+TEST_F(TestHashKernel, DictEncodeFixedSizeBinary) {
+  CheckDictEncode<FixedSizeBinaryType, std::string>(
+      &this->ctx_, fixed_size_binary(5), {"bbbbb", "", "bbbbb", "aaaaa", "ccccc"},
+      {true, false, true, true, true}, {"bbbbb", "aaaaa", "ccccc"}, {}, {0, 0, 0, 1, 2});
+}
+
+TEST_F(TestHashKernel, FixedSizeBinaryResizeTable) {
+  const int64_t kTotalValues = 10000;
+  const int64_t kRepeats = 10;
+
+  vector<std::string> values;
+  vector<std::string> uniques;
+  vector<int32_t> indices;
+  for (int64_t i = 0; i < kTotalValues * kRepeats; i++) {
+    int64_t index = i % kTotalValues;
+    std::stringstream ss;
+    ss << "test" << static_cast<char>(index / 128) << static_cast<char>(index % 128);
+    std::string val = ss.str();
+
+    values.push_back(val);
+
+    if (i < kTotalValues) {
+      uniques.push_back(val);
+    }
+    indices.push_back(static_cast<int32_t>(i % kTotalValues));
+  }
+
+  auto type = fixed_size_binary(6);
+  CheckUnique<FixedSizeBinaryType, std::string>(&this->ctx_, type, values, {}, uniques,
+                                                {});
+  CheckDictEncode<FixedSizeBinaryType, std::string>(&this->ctx_, type, values, {},
+                                                    uniques, {}, indices);
+}
+
+TEST_F(TestHashKernel, UniqueDecimal) {
+  vector<Decimal128> values{12, 12, 11, 12};
+  vector<Decimal128> expected{12, 11};
+
+  CheckUnique<Decimal128Type, Decimal128>(&this->ctx_, decimal(2, 0), values,
+                                          {true, false, true, true}, expected, {});
+}
+
+TEST_F(TestHashKernel, DictEncodeDecimal) {
+  vector<Decimal128> values{12, 12, 11, 12, 13};
+  vector<Decimal128> expected{12, 11, 13};
+
+  CheckDictEncode<Decimal128Type, Decimal128>(&this->ctx_, decimal(2, 0), values,
+                                              {true, false, true, true, true}, expected,
+                                              {}, {0, 0, 1, 0, 2});
+}
+
+TEST_F(TestHashKernel, ChunkedArrayInvoke) {
+  vector<std::string> values1 = {"foo", "bar", "foo"};
+  vector<std::string> values2 = {"bar", "baz", "quuux", "foo"};
+
+  auto type = utf8();
+  auto a1 = _MakeArray<StringType, std::string>(type, values1, {});
+  auto a2 = _MakeArray<StringType, std::string>(type, values2, {});
+
+  vector<std::string> dict_values = {"foo", "bar", "baz", "quuux"};
+  auto ex_dict = _MakeArray<StringType, std::string>(type, dict_values, {});
+
+  ArrayVector arrays = {a1, a2};
+  auto carr = std::make_shared<ChunkedArray>(arrays);
+
+  // Unique
+  shared_ptr<Array> result;
+  ASSERT_OK(Unique(&this->ctx_, Datum(carr), &result));
+  ASSERT_ARRAYS_EQUAL(*ex_dict, *result);
+
+  // Dictionary encode
+  auto dict_type = dictionary(int32(), ex_dict);
+
+  auto i1 = _MakeArray<Int32Type, int32_t>(int32(), {0, 1, 0}, {});
+  auto i2 = _MakeArray<Int32Type, int32_t>(int32(), {1, 2, 3, 0}, {});
+
+  ArrayVector dict_arrays = {std::make_shared<DictionaryArray>(dict_type, i1),
+                             std::make_shared<DictionaryArray>(dict_type, i2)};
+  auto dict_carr = std::make_shared<ChunkedArray>(dict_arrays);
+
+  Datum encoded_out;
+  ASSERT_OK(DictionaryEncode(&this->ctx_, Datum(carr), &encoded_out));
+  ASSERT_EQ(Datum::CHUNKED_ARRAY, encoded_out.kind());
+
+  ASSERT_TRUE(encoded_out.chunked_array()->Equals(*dict_carr));
 }
 
 }  // namespace compute
diff --git a/cpp/src/arrow/compute/context.cc b/cpp/src/arrow/compute/context.cc
index 792dc4f..63aa341 100644
--- a/cpp/src/arrow/compute/context.cc
+++ b/cpp/src/arrow/compute/context.cc
@@ -20,11 +20,16 @@
 #include <memory>
 
 #include "arrow/buffer.h"
+#include "arrow/util/cpu-info.h"
 
 namespace arrow {
 namespace compute {
 
-FunctionContext::FunctionContext(MemoryPool* pool) : pool_(pool) {}
+FunctionContext::FunctionContext(MemoryPool* pool) : pool_(pool) {
+  if (!::arrow::CpuInfo::initialized()) {
+    ::arrow::CpuInfo::Init();
+  }
+}
 
 MemoryPool* FunctionContext::memory_pool() const { return pool_; }
 
diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h
index 4e072a7..0037245 100644
--- a/cpp/src/arrow/compute/kernel.h
+++ b/cpp/src/arrow/compute/kernel.h
@@ -18,7 +18,14 @@
 #ifndef ARROW_COMPUTE_KERNEL_H
 #define ARROW_COMPUTE_KERNEL_H
 
+#include <memory>
+#include <vector>
+
 #include "arrow/array.h"
+#include "arrow/table.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/variant.h"
+#include "arrow/util/visibility.h"
 
 namespace arrow {
 namespace compute {
@@ -32,11 +39,99 @@ class ARROW_EXPORT OpKernel {
   virtual ~OpKernel() = default;
 };
 
+/// \brief Placeholder for Scalar values until we implement these
+struct ARROW_EXPORT Scalar {
+  ~Scalar() {}
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(Scalar);
+};
+
+/// \class Datum
+/// \brief Variant type for various Arrow C++ data structures
+struct ARROW_EXPORT Datum {
+  enum type { NONE, SCALAR, ARRAY, CHUNKED_ARRAY, RECORD_BATCH, TABLE, COLLECTION };
+
+  util::variant<decltype(NULLPTR), std::shared_ptr<Scalar>, std::shared_ptr<ArrayData>,
+                std::shared_ptr<ChunkedArray>, std::shared_ptr<RecordBatch>,
+                std::shared_ptr<Table>, std::vector<Datum>>
+      value;
+
+  /// \brief Empty datum, to be populated elsewhere
+  Datum() : value(nullptr) {}
+
+  explicit Datum(const std::shared_ptr<Scalar>& value) : value(value) {}
+
+  explicit Datum(const std::shared_ptr<ArrayData>& value) : value(value) {}
+
+  explicit Datum(const std::shared_ptr<Array>& value) : Datum(value->data()) {}
+
+  explicit Datum(const std::shared_ptr<ChunkedArray>& value) : value(value) {}
+
+  explicit Datum(const std::shared_ptr<RecordBatch>& value) : value(value) {}
+
+  explicit Datum(const std::shared_ptr<Table>& value) : value(value) {}
+
+  explicit Datum(const std::vector<Datum>& value) : value(value) {}
+
+  ~Datum() {}
+
+  Datum(const Datum& other) noexcept { this->value = other.value; }
+
+  Datum::type kind() const {
+    switch (this->value.which()) {
+      case 0:
+        return Datum::NONE;
+      case 1:
+        return Datum::SCALAR;
+      case 2:
+        return Datum::ARRAY;
+      case 3:
+        return Datum::CHUNKED_ARRAY;
+      case 4:
+        return Datum::RECORD_BATCH;
+      case 5:
+        return Datum::TABLE;
+      case 6:
+        return Datum::COLLECTION;
+      default:
+        return Datum::NONE;
+    }
+  }
+
+  std::shared_ptr<ArrayData> array() const {
+    return util::get<std::shared_ptr<ArrayData>>(this->value);
+  }
+
+  std::shared_ptr<ChunkedArray> chunked_array() const {
+    return util::get<std::shared_ptr<ChunkedArray>>(this->value);
+  }
+
+  const std::vector<Datum> collection() const {
+    return util::get<std::vector<Datum>>(this->value);
+  }
+
+  bool is_arraylike() const {
+    return this->kind() == Datum::ARRAY || this->kind() == Datum::CHUNKED_ARRAY;
+  }
+
+  /// \brief The value type of the variant, if any
+  ///
+  /// \return nullptr if no type
+  std::shared_ptr<DataType> type() const {
+    if (this->kind() == Datum::ARRAY) {
+      return util::get<std::shared_ptr<ArrayData>>(this->value)->type;
+    } else if (this->kind() == Datum::CHUNKED_ARRAY) {
+      return util::get<std::shared_ptr<ChunkedArray>>(this->value)->type();
+    }
+    return nullptr;
+  }
+};
+
 /// \class UnaryKernel
 /// \brief An array-valued function of a single input argument
 class ARROW_EXPORT UnaryKernel : public OpKernel {
  public:
-  virtual Status Call(FunctionContext* ctx, const Array& input, ArrayData* out) = 0;
+  virtual Status Call(FunctionContext* ctx, const ArrayData& input, Datum* out) = 0;
 };
 
 }  // namespace compute
diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt
similarity index 63%
copy from cpp/src/arrow/compute/CMakeLists.txt
copy to cpp/src/arrow/compute/kernels/CMakeLists.txt
index 4589afb..715e6c6 100644
--- a/cpp/src/arrow/compute/CMakeLists.txt
+++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt
@@ -15,24 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Headers: top level
 install(FILES
-  api.h
   cast.h
-  context.h
-  kernel.h
-  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/compute")
-
-# pkg-config support
-configure_file(arrow-compute.pc.in
-  "${CMAKE_CURRENT_BINARY_DIR}/arrow-compute.pc"
-  @ONLY)
-install(
-  FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-compute.pc"
-  DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
-
-#######################################
-# Unit tests
-#######################################
-
-ADD_ARROW_TEST(compute-test)
+  hash.h
+  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/compute/kernels")
diff --git a/cpp/src/arrow/compute/cast.cc b/cpp/src/arrow/compute/kernels/cast.cc
similarity index 82%
rename from cpp/src/arrow/compute/cast.cc
rename to cpp/src/arrow/compute/kernels/cast.cc
index 114ab9a..6a42ec8 100644
--- a/cpp/src/arrow/compute/cast.cc
+++ b/cpp/src/arrow/compute/kernels/cast.cc
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/cast.h"
 
 #include <cstdint>
 #include <cstring>
@@ -26,6 +26,7 @@
 #include <string>
 #include <type_traits>
 #include <utility>
+#include <vector>
 
 #include "arrow/array.h"
 #include "arrow/buffer.h"
@@ -39,6 +40,7 @@
 
 #include "arrow/compute/context.h"
 #include "arrow/compute/kernel.h"
+#include "arrow/compute/kernels/util-internal.h"
 
 #ifdef ARROW_EXTRA_ERROR_CONTEXT
 
@@ -71,29 +73,6 @@ namespace compute {
 
 constexpr int64_t kMillisecondsInDay = 86400000;
 
-template <typename T>
-inline const T* GetValues(const ArrayData& data, int i) {
-  return reinterpret_cast<const T*>(data.buffers[i]->data()) + data.offset;
-}
-
-template <typename T>
-inline T* GetMutableValues(const ArrayData* data, int i) {
-  return reinterpret_cast<T*>(data->buffers[i]->mutable_data()) + data->offset;
-}
-
-namespace {
-
-void CopyData(const Array& input, ArrayData* output) {
-  auto in_data = input.data();
-  output->length = in_data->length;
-  output->null_count = input.null_count();
-  output->buffers = in_data->buffers;
-  output->offset = in_data->offset;
-  output->child_data = in_data->child_data;
-}
-
-}  // namespace
-
 // ----------------------------------------------------------------------
 // Zero copy casts
 
@@ -128,8 +107,8 @@ struct CastFunctor {};
 // Indicated no computation required
 template <typename O, typename I>
 struct CastFunctor<O, I, typename std::enable_if<is_zero_copy_cast<O, I>::value>::type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     CopyData(input, output);
   }
 };
@@ -140,8 +119,8 @@ struct CastFunctor<O, I, typename std::enable_if<is_zero_copy_cast<O, I>::value>
 template <typename T>
 struct CastFunctor<T, NullType, typename std::enable_if<
                                     std::is_base_of<FixedWidthType, T>::value>::type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     // Simply initialize data to 0
     auto buf = output->buffers[1];
     DCHECK_EQ(output->offset, 0);
@@ -151,8 +130,8 @@ struct CastFunctor<T, NullType, typename std::enable_if<
 
 template <>
 struct CastFunctor<NullType, DictionaryType> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {}
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {}
 };
 
 // ----------------------------------------------------------------------
@@ -160,19 +139,17 @@ struct CastFunctor<NullType, DictionaryType> {
 
 // Cast from Boolean to other numbers
 template <typename T>
-struct CastFunctor<T, BooleanType,
-                   typename std::enable_if<std::is_base_of<Number, T>::value>::type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+struct CastFunctor<T, BooleanType, enable_if_number<T>> {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     using c_type = typename T::c_type;
     constexpr auto kOne = static_cast<c_type>(1);
     constexpr auto kZero = static_cast<c_type>(0);
 
-    auto in_data = input.data();
-    internal::BitmapReader bit_reader(in_data->buffers[1]->data(), in_data->offset,
-                                      in_data->length);
+    internal::BitmapReader bit_reader(input.buffers[1]->data(), input.offset,
+                                      input.length);
     auto out = GetMutableValues<c_type>(output, 1);
-    for (int64_t i = 0; i < input.length(); ++i) {
+    for (int64_t i = 0; i < input.length; ++i) {
       *out++ = bit_reader.IsSet() ? kOne : kZero;
       bit_reader.Next();
     }
@@ -216,14 +193,14 @@ template <typename O, typename I>
 struct CastFunctor<O, I, typename std::enable_if<std::is_same<BooleanType, O>::value &&
                                                  std::is_base_of<Number, I>::value &&
                                                  !std::is_same<O, I>::value>::type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     using in_type = typename I::c_type;
     DCHECK_EQ(output->offset, 0);
 
-    const in_type* in_data = GetValues<in_type>(*input.data(), 1);
+    const in_type* in_data = GetValues<in_type>(input, 1);
     uint8_t* out_data = GetMutableValues<uint8_t>(output, 1);
-    for (int64_t i = 0; i < input.length(); ++i) {
+    for (int64_t i = 0; i < input.length; ++i) {
       BitUtil::SetBitTo(out_data, i, (*in_data++) != 0);
     }
   }
@@ -232,25 +209,26 @@ struct CastFunctor<O, I, typename std::enable_if<std::is_same<BooleanType, O>::v
 template <typename O, typename I>
 struct CastFunctor<O, I,
                    typename std::enable_if<is_integer_downcast<O, I>::value>::type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     using in_type = typename I::c_type;
     using out_type = typename O::c_type;
     DCHECK_EQ(output->offset, 0);
 
-    auto in_offset = input.offset();
+    auto in_offset = input.offset;
 
-    const in_type* in_data = GetValues<in_type>(*input.data(), 1);
+    const in_type* in_data = GetValues<in_type>(input, 1);
     auto out_data = GetMutableValues<out_type>(output, 1);
 
     if (!options.allow_int_overflow) {
       constexpr in_type kMax = static_cast<in_type>(std::numeric_limits<out_type>::max());
       constexpr in_type kMin = static_cast<in_type>(std::numeric_limits<out_type>::min());
 
-      if (input.null_count() > 0) {
-        internal::BitmapReader is_valid_reader(input.data()->buffers[0]->data(),
-                                               in_offset, input.length());
-        for (int64_t i = 0; i < input.length(); ++i) {
+      // Null count may be -1 if the input array had been sliced
+      if (input.null_count != 0) {
+        internal::BitmapReader is_valid_reader(input.buffers[0]->data(), in_offset,
+                                               input.length);
+        for (int64_t i = 0; i < input.length; ++i) {
           if (ARROW_PREDICT_FALSE(is_valid_reader.IsSet() &&
                                   (*in_data > kMax || *in_data < kMin))) {
             ctx->SetStatus(Status::Invalid("Integer value out of bounds"));
@@ -259,7 +237,7 @@ struct CastFunctor<O, I,
           is_valid_reader.Next();
         }
       } else {
-        for (int64_t i = 0; i < input.length(); ++i) {
+        for (int64_t i = 0; i < input.length; ++i) {
           if (ARROW_PREDICT_FALSE(*in_data > kMax || *in_data < kMin)) {
             ctx->SetStatus(Status::Invalid("Integer value out of bounds"));
           }
@@ -267,7 +245,7 @@ struct CastFunctor<O, I,
         }
       }
     } else {
-      for (int64_t i = 0; i < input.length(); ++i) {
+      for (int64_t i = 0; i < input.length; ++i) {
         *out_data++ = static_cast<out_type>(*in_data++);
       }
     }
@@ -278,14 +256,14 @@ template <typename O, typename I>
 struct CastFunctor<O, I,
                    typename std::enable_if<is_numeric_cast<O, I>::value &&
                                            !is_integer_downcast<O, I>::value>::type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     using in_type = typename I::c_type;
     using out_type = typename O::c_type;
 
-    const in_type* in_data = GetValues<in_type>(*input.data(), 1);
+    const in_type* in_data = GetValues<in_type>(input, 1);
     auto out_data = GetMutableValues<out_type>(output, 1);
-    for (int64_t i = 0; i < input.length(); ++i) {
+    for (int64_t i = 0; i < input.length; ++i) {
       *out_data++ = static_cast<out_type>(*in_data++);
     }
   }
@@ -296,34 +274,52 @@ struct CastFunctor<O, I,
 
 template <typename in_type, typename out_type>
 void ShiftTime(FunctionContext* ctx, const CastOptions& options, const bool is_multiply,
-               const int64_t factor, const Array& input, ArrayData* output) {
-  const in_type* in_data = GetValues<in_type>(*input.data(), 1);
+               const int64_t factor, const ArrayData& input, ArrayData* output) {
+  const in_type* in_data = GetValues<in_type>(input, 1);
   auto out_data = GetMutableValues<out_type>(output, 1);
 
   if (factor == 1) {
-    for (int64_t i = 0; i < input.length(); i++) {
+    for (int64_t i = 0; i < input.length; i++) {
       out_data[i] = static_cast<out_type>(in_data[i]);
     }
   } else if (is_multiply) {
-    for (int64_t i = 0; i < input.length(); i++) {
+    for (int64_t i = 0; i < input.length; i++) {
       out_data[i] = static_cast<out_type>(in_data[i] * factor);
     }
   } else {
     if (options.allow_time_truncate) {
-      for (int64_t i = 0; i < input.length(); i++) {
+      for (int64_t i = 0; i < input.length; i++) {
         out_data[i] = static_cast<out_type>(in_data[i] / factor);
       }
     } else {
-      for (int64_t i = 0; i < input.length(); i++) {
-        out_data[i] = static_cast<out_type>(in_data[i] / factor);
-        if (input.IsValid(i) && (out_data[i] * factor != in_data[i])) {
-          std::stringstream ss;
-          ss << "Casting from " << input.type()->ToString() << " to "
-             << output->type->ToString() << " would lose data: " << in_data[i];
-          ctx->SetStatus(Status::Invalid(ss.str()));
-          break;
+#define RAISE_INVALID_CAST(VAL)                                                         \
+  std::stringstream ss;                                                                 \
+  ss << "Casting from " << input.type->ToString() << " to " << output->type->ToString() \
+     << " would lose data: " << VAL;                                                    \
+  ctx->SetStatus(Status::Invalid(ss.str()));
+
+      if (input.null_count != 0) {
+        internal::BitmapReader bit_reader(input.buffers[0]->data(), input.offset,
+                                          input.length);
+        for (int64_t i = 0; i < input.length; i++) {
+          out_data[i] = static_cast<out_type>(in_data[i] / factor);
+          if (bit_reader.IsSet() && (out_data[i] * factor != in_data[i])) {
+            RAISE_INVALID_CAST(in_data[i]);
+            break;
+          }
+          bit_reader.Next();
+        }
+      } else {
+        for (int64_t i = 0; i < input.length; i++) {
+          out_data[i] = static_cast<out_type>(in_data[i] / factor);
+          if (out_data[i] * factor != in_data[i]) {
+            RAISE_INVALID_CAST(in_data[i]);
+            break;
+          }
         }
       }
+
+#undef RAISE_INVALID_CAST
     }
   }
 }
@@ -342,10 +338,10 @@ const std::pair<bool, int64_t> kTimeConversionTable[4][4] = {
 
 template <>
 struct CastFunctor<TimestampType, TimestampType> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     // If units are the same, zero copy, otherwise convert
-    const auto& in_type = static_cast<const TimestampType&>(*input.type());
+    const auto& in_type = static_cast<const TimestampType&>(*input.type);
     const auto& out_type = static_cast<const TimestampType&>(*output->type);
 
     if (in_type.unit() == out_type.unit()) {
@@ -364,9 +360,9 @@ struct CastFunctor<TimestampType, TimestampType> {
 
 template <>
 struct CastFunctor<Date32Type, TimestampType> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
-    const auto& in_type = static_cast<const TimestampType&>(*input.type());
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
+    const auto& in_type = static_cast<const TimestampType&>(*input.type);
 
     static const int64_t kTimestampToDateFactors[4] = {
         86400LL,                             // SECOND
@@ -382,9 +378,9 @@ struct CastFunctor<Date32Type, TimestampType> {
 
 template <>
 struct CastFunctor<Date64Type, TimestampType> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
-    const auto& in_type = static_cast<const TimestampType&>(*input.type());
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
+    const auto& in_type = static_cast<const TimestampType&>(*input.type);
 
     std::pair<bool, int64_t> conversion =
         kTimeConversionTable[static_cast<int>(in_type.unit())]
@@ -393,17 +389,21 @@ struct CastFunctor<Date64Type, TimestampType> {
     ShiftTime<int64_t, int64_t>(ctx, options, conversion.first, conversion.second, input,
                                 output);
 
+    internal::BitmapReader bit_reader(input.buffers[0]->data(), input.offset,
+                                      input.length);
+
     // Ensure that intraday milliseconds have been zeroed out
     auto out_data = GetMutableValues<int64_t>(output, 1);
-    for (int64_t i = 0; i < input.length(); ++i) {
+    for (int64_t i = 0; i < input.length; ++i) {
       const int64_t remainder = out_data[i] % kMillisecondsInDay;
-      if (ARROW_PREDICT_FALSE(!options.allow_time_truncate && input.IsValid(i) &&
+      if (ARROW_PREDICT_FALSE(!options.allow_time_truncate && bit_reader.IsSet() &&
                               remainder > 0)) {
         ctx->SetStatus(
             Status::Invalid("Timestamp value had non-zero intraday milliseconds"));
         break;
       }
       out_data[i] -= remainder;
+      bit_reader.Next();
     }
   }
 };
@@ -415,13 +415,13 @@ template <typename O, typename I>
 struct CastFunctor<O, I,
                    typename std::enable_if<std::is_base_of<TimeType, I>::value &&
                                            std::is_base_of<TimeType, O>::value>::type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     using in_t = typename I::c_type;
     using out_t = typename O::c_type;
 
     // If units are the same, zero copy, otherwise convert
-    const auto& in_type = static_cast<const I&>(*input.type());
+    const auto& in_type = static_cast<const I&>(*input.type);
     const auto& out_type = static_cast<const O&>(*output->type);
 
     if (in_type.unit() == out_type.unit()) {
@@ -443,16 +443,16 @@ struct CastFunctor<O, I,
 
 template <>
 struct CastFunctor<Date64Type, Date32Type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     ShiftTime<int32_t, int64_t>(ctx, options, true, kMillisecondsInDay, input, output);
   }
 };
 
 template <>
 struct CastFunctor<Date32Type, Date64Type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     ShiftTime<int64_t, int32_t>(ctx, options, false, kMillisecondsInDay, input, output);
   }
 };
@@ -487,10 +487,11 @@ template <typename T>
 struct CastFunctor<
     T, DictionaryType,
     typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T>::value>::type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
-    const DictionaryArray& dict_array = static_cast<const DictionaryArray&>(input);
-    const DictionaryType& type = static_cast<const DictionaryType&>(*input.type());
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
+    DictionaryArray dict_array(input.ShallowCopy());
+
+    const DictionaryType& type = static_cast<const DictionaryType&>(*input.type);
     const DataType& values_type = *type.dictionary()->type();
     const FixedSizeBinaryArray& dictionary =
         static_cast<const FixedSizeBinaryArray&>(*type.dictionary());
@@ -558,10 +559,11 @@ Status UnpackBinaryDictionary(FunctionContext* ctx, const Array& indices,
 template <typename T>
 struct CastFunctor<T, DictionaryType,
                    typename std::enable_if<std::is_base_of<BinaryType, T>::value>::type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
-    const DictionaryArray& dict_array = static_cast<const DictionaryArray&>(input);
-    const DictionaryType& type = static_cast<const DictionaryType&>(*input.type());
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
+    DictionaryArray dict_array(input.ShallowCopy());
+
+    const DictionaryType& type = static_cast<const DictionaryType&>(*input.type);
     const DataType& values_type = *type.dictionary()->type();
     const BinaryArray& dictionary = static_cast<const BinaryArray&>(*type.dictionary());
 
@@ -617,12 +619,13 @@ void UnpackPrimitiveDictionary(const Array& indices, const c_type* dictionary,
 template <typename T>
 struct CastFunctor<T, DictionaryType,
                    typename std::enable_if<IsNumeric<T>::value>::type> {
-  void operator()(FunctionContext* ctx, const CastOptions& options, const Array& input,
-                  ArrayData* output) {
+  void operator()(FunctionContext* ctx, const CastOptions& options,
+                  const ArrayData& input, ArrayData* output) {
     using c_type = typename T::c_type;
 
-    const DictionaryArray& dict_array = static_cast<const DictionaryArray&>(input);
-    const DictionaryType& type = static_cast<const DictionaryType&>(*input.type());
+    DictionaryArray dict_array(input.ShallowCopy());
+
+    const DictionaryType& type = static_cast<const DictionaryType&>(*input.type);
     const DataType& values_type = *type.dictionary()->type();
 
     // Check if values and output type match
@@ -657,24 +660,23 @@ struct CastFunctor<T, DictionaryType,
 
 // ----------------------------------------------------------------------
 
-typedef std::function<void(FunctionContext*, const CastOptions& options, const Array&,
+typedef std::function<void(FunctionContext*, const CastOptions& options, const ArrayData&,
                            ArrayData*)>
     CastFunction;
 
-static Status AllocateIfNotPreallocated(FunctionContext* ctx, const Array& input,
+static Status AllocateIfNotPreallocated(FunctionContext* ctx, const ArrayData& input,
                                         bool can_pre_allocate_values, ArrayData* out) {
-  const int64_t length = input.length();
-
-  out->null_count = input.null_count();
+  const int64_t length = input.length;
+  out->null_count = input.null_count;
 
   // Propagate bitmap unless we are null type
-  std::shared_ptr<Buffer> validity_bitmap = input.data()->buffers[0];
-  if (input.type_id() == Type::NA) {
+  std::shared_ptr<Buffer> validity_bitmap = input.buffers[0];
+  if (input.type->id() == Type::NA) {
     int64_t bitmap_size = BitUtil::BytesForBits(length);
     RETURN_NOT_OK(ctx->Allocate(bitmap_size, &validity_bitmap));
     memset(validity_bitmap->mutable_data(), 0, bitmap_size);
-  } else if (input.offset() != 0) {
-    RETURN_NOT_OK(CopyBitmap(ctx->memory_pool(), validity_bitmap->data(), input.offset(),
+  } else if (input.offset != 0) {
+    RETURN_NOT_OK(CopyBitmap(ctx->memory_pool(), validity_bitmap->data(), input.offset,
                              length, &validity_bitmap));
   }
 
@@ -727,17 +729,28 @@ static Status AllocateIfNotPreallocated(FunctionContext* ctx, const Array& input
 class CastKernel : public UnaryKernel {
  public:
   CastKernel(const CastOptions& options, const CastFunction& func, bool is_zero_copy,
-             bool can_pre_allocate_values)
+             bool can_pre_allocate_values, const std::shared_ptr<DataType>& out_type)
       : options_(options),
         func_(func),
         is_zero_copy_(is_zero_copy),
-        can_pre_allocate_values_(can_pre_allocate_values) {}
+        can_pre_allocate_values_(can_pre_allocate_values),
+        out_type_(out_type) {}
+
+  Status Call(FunctionContext* ctx, const ArrayData& input, Datum* out) override {
+    ArrayData* result;
+
+    if (out->kind() == Datum::NONE) {
+      out->value = std::make_shared<ArrayData>(out_type_, input.length);
+    }
+
+    result = out->array().get();
 
-  Status Call(FunctionContext* ctx, const Array& input, ArrayData* out) override {
     if (!is_zero_copy_) {
-      RETURN_NOT_OK(AllocateIfNotPreallocated(ctx, input, can_pre_allocate_values_, out));
+      RETURN_NOT_OK(
+          AllocateIfNotPreallocated(ctx, input, can_pre_allocate_values_, result));
     }
-    func_(ctx, options_, input, out);
+    func_(ctx, options_, input, result);
+
     RETURN_IF_ERROR(ctx);
     return Status::OK();
   }
@@ -747,18 +760,19 @@ class CastKernel : public UnaryKernel {
   CastFunction func_;
   bool is_zero_copy_;
   bool can_pre_allocate_values_;
+  std::shared_ptr<DataType> out_type_;
 };
 
-#define CAST_CASE(InType, OutType)                                                  \
-  case OutType::type_id:                                                            \
-    is_zero_copy = is_zero_copy_cast<OutType, InType>::value;                       \
-    can_pre_allocate_values =                                                       \
-        !(!is_binary_like(InType::type_id) && is_binary_like(OutType::type_id));    \
-    func = [](FunctionContext* ctx, const CastOptions& options, const Array& input, \
-              ArrayData* out) {                                                     \
-      CastFunctor<OutType, InType> func;                                            \
-      func(ctx, options, input, out);                                               \
-    };                                                                              \
+#define CAST_CASE(InType, OutType)                                                      \
+  case OutType::type_id:                                                                \
+    is_zero_copy = is_zero_copy_cast<OutType, InType>::value;                           \
+    can_pre_allocate_values =                                                           \
+        !(!is_binary_like(InType::type_id) && is_binary_like(OutType::type_id));        \
+    func = [](FunctionContext* ctx, const CastOptions& options, const ArrayData& input, \
+              ArrayData* out) {                                                         \
+      CastFunctor<OutType, InType> func;                                                \
+      func(ctx, options, input, out);                                                   \
+    };                                                                                  \
     break;
 
 #define NUMERIC_CASES(FN, IN_TYPE) \
@@ -832,26 +846,26 @@ class CastKernel : public UnaryKernel {
   FN(IN_TYPE, FloatType);             \
   FN(IN_TYPE, DoubleType);            \
   FN(IN_TYPE, FixedSizeBinaryType);   \
-  FN(IN_TYPE, DecimalType);           \
+  FN(IN_TYPE, Decimal128Type);        \
   FN(IN_TYPE, BinaryType);            \
   FN(IN_TYPE, StringType);
 
-#define GET_CAST_FUNCTION(CASE_GENERATOR, InType)                                \
-  static std::unique_ptr<UnaryKernel> Get##InType##CastFunc(                     \
-      const std::shared_ptr<DataType>& out_type, const CastOptions& options) {   \
-    CastFunction func;                                                           \
-    bool is_zero_copy = false;                                                   \
-    bool can_pre_allocate_values = true;                                         \
-    switch (out_type->id()) {                                                    \
-      CASE_GENERATOR(CAST_CASE, InType);                                         \
-      default:                                                                   \
-        break;                                                                   \
-    }                                                                            \
-    if (func != nullptr) {                                                       \
-      return std::unique_ptr<UnaryKernel>(                                       \
-          new CastKernel(options, func, is_zero_copy, can_pre_allocate_values)); \
-    }                                                                            \
-    return nullptr;                                                              \
+#define GET_CAST_FUNCTION(CASE_GENERATOR, InType)                              \
+  static std::unique_ptr<UnaryKernel> Get##InType##CastFunc(                   \
+      const std::shared_ptr<DataType>& out_type, const CastOptions& options) { \
+    CastFunction func;                                                         \
+    bool is_zero_copy = false;                                                 \
+    bool can_pre_allocate_values = true;                                       \
+    switch (out_type->id()) {                                                  \
+      CASE_GENERATOR(CAST_CASE, InType);                                       \
+      default:                                                                 \
+        break;                                                                 \
+    }                                                                          \
+    if (func != nullptr) {                                                     \
+      return std::unique_ptr<UnaryKernel>(new CastKernel(                      \
+          options, func, is_zero_copy, can_pre_allocate_values, out_type));    \
+    }                                                                          \
+    return nullptr;                                                            \
   }
 
 GET_CAST_FUNCTION(NULL_CASES, NullType);
@@ -912,18 +926,27 @@ Status GetCastFunction(const DataType& in_type, const std::shared_ptr<DataType>&
   return Status::OK();
 }
 
-Status Cast(FunctionContext* ctx, const Array& array,
+Status Cast(FunctionContext* ctx, const Datum& value,
             const std::shared_ptr<DataType>& out_type, const CastOptions& options,
-            std::shared_ptr<Array>* out) {
+            Datum* out) {
   // Dynamic dispatch to obtain right cast function
   std::unique_ptr<UnaryKernel> func;
-  RETURN_NOT_OK(GetCastFunction(*array.type(), out_type, options, &func));
+  RETURN_NOT_OK(GetCastFunction(*value.type(), out_type, options, &func));
 
-  // Data structure for output
-  auto out_data = std::make_shared<ArrayData>(out_type, array.length());
+  std::vector<Datum> result;
+  RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, func.get(), value, &result));
 
-  RETURN_NOT_OK(func->Call(ctx, array, out_data.get()));
-  *out = MakeArray(out_data);
+  *out = detail::WrapDatumsLike(value, result);
+  return Status::OK();
+}
+
+Status Cast(FunctionContext* ctx, const Array& array,
+            const std::shared_ptr<DataType>& out_type, const CastOptions& options,
+            std::shared_ptr<Array>* out) {
+  Datum datum_out;
+  RETURN_NOT_OK(Cast(ctx, Datum(array.data()), out_type, options, &datum_out));
+  DCHECK_EQ(Datum::ARRAY, datum_out.kind());
+  *out = MakeArray(datum_out.array());
   return Status::OK();
 }
 
diff --git a/cpp/src/arrow/compute/cast.h b/cpp/src/arrow/compute/kernels/cast.h
similarity index 69%
rename from cpp/src/arrow/compute/cast.h
rename to cpp/src/arrow/compute/kernels/cast.h
index d7bde20..b75bb7b 100644
--- a/cpp/src/arrow/compute/cast.h
+++ b/cpp/src/arrow/compute/kernels/cast.h
@@ -15,25 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_COMPUTE_CAST_H
-#define ARROW_COMPUTE_CAST_H
+#ifndef ARROW_COMPUTE_KERNELS_CAST_H
+#define ARROW_COMPUTE_KERNELS_CAST_H
 
 #include <memory>
 
 #include "arrow/status.h"
 #include "arrow/util/visibility.h"
 
+#include "arrow/compute/kernel.h"
+
 namespace arrow {
 
 class Array;
+class ChunkedArray;
+class Column;
 class DataType;
 
 namespace compute {
 
-class FunctionContext;
-class UnaryKernel;
-
-struct CastOptions {
+struct ARROW_EXPORT CastOptions {
   CastOptions() : allow_int_overflow(false), allow_time_truncate(false) {}
 
   bool allow_int_overflow;
@@ -48,7 +49,7 @@ Status GetCastFunction(const DataType& in_type, const std::shared_ptr<DataType>&
 
 /// \brief Cast from one array type to another
 /// \param[in] context the FunctionContext
-/// \param[in] array array to cast
+/// \param[in] value array to cast
 /// \param[in] to_type type to cast to
 /// \param[in] options casting options
 /// \param[out] out resulting array
@@ -56,11 +57,25 @@ Status GetCastFunction(const DataType& in_type, const std::shared_ptr<DataType>&
 /// \since 0.7.0
 /// \note API not yet finalized
 ARROW_EXPORT
-Status Cast(FunctionContext* context, const Array& array,
+Status Cast(FunctionContext* context, const Array& value,
             const std::shared_ptr<DataType>& to_type, const CastOptions& options,
             std::shared_ptr<Array>* out);
 
+/// \brief Cast from one value to another
+/// \param[in] context the FunctionContext
+/// \param[in] value datum to cast
+/// \param[in] to_type type to cast to
+/// \param[in] options casting options
+/// \param[out] out resulting datum
+///
+/// \since 0.8.0
+/// \note API not yet finalized
+ARROW_EXPORT
+Status Cast(FunctionContext* context, const Datum& value,
+            const std::shared_ptr<DataType>& to_type, const CastOptions& options,
+            Datum* out);
+
 }  // namespace compute
 }  // namespace arrow
 
-#endif  // ARROW_COMPUTE_CAST_H
+#endif  // ARROW_COMPUTE_KERNELS_CAST_H
diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc
new file mode 100644
index 0000000..3af4160
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/hash.cc
@@ -0,0 +1,822 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/kernels/hash.h"
+
+#include <exception>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "arrow/builder.h"
+#include "arrow/compute/context.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/compute/kernels/util-internal.h"
+#include "arrow/util/hash-util.h"
+
+namespace arrow {
+namespace compute {
+
+namespace {
+
+// Initially 1024 elements
+static constexpr int64_t kInitialHashTableSize = 1 << 10;
+
+typedef int32_t hash_slot_t;
+static constexpr hash_slot_t kHashSlotEmpty = std::numeric_limits<int32_t>::max();
+
+// The maximum load factor for the hash table before resizing.
+static constexpr double kMaxHashTableLoad = 0.7;
+
+enum class SIMDMode : char { NOSIMD, SSE4, AVX2 };
+
+#define CHECK_IMPLEMENTED(KERNEL, FUNCNAME, TYPE)                  \
+  if (!KERNEL) {                                                   \
+    std::stringstream ss;                                          \
+    ss << FUNCNAME << " not implemented for " << type->ToString(); \
+    return Status::NotImplemented(ss.str());                       \
+  }
+
+Status NewHashTable(int64_t size, MemoryPool* pool, std::shared_ptr<Buffer>* out) {
+  auto hash_table = std::make_shared<PoolBuffer>(pool);
+
+  RETURN_NOT_OK(hash_table->Resize(sizeof(hash_slot_t) * size));
+  int32_t* slots = reinterpret_cast<hash_slot_t*>(hash_table->mutable_data());
+  std::fill(slots, slots + size, kHashSlotEmpty);
+
+  *out = hash_table;
+  return Status::OK();
+}
+
+// This is a slight design concession -- some hash actions have the possibility
+// of failure. Rather than introduce extra error checking into all actions, we
+// will raise an internal exception so that only the actions where errors can
+// occur will experience the extra overhead
+class HashException : public std::exception {
+ public:
+  explicit HashException(const std::string& msg, StatusCode code = StatusCode::Invalid)
+      : msg_(msg), code_(code) {}
+
+  ~HashException() throw() {}
+
+  const char* what() const throw() override;
+
+  StatusCode code() const { return code_; }
+
+ private:
+  std::string msg_;
+  StatusCode code_;
+};
+
+const char* HashException::what() const throw() { return msg_.c_str(); }
+
+class HashTable {
+ public:
+  HashTable(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+      : type_(type),
+        pool_(pool),
+        initialized_(false),
+        hash_table_(nullptr),
+        hash_slots_(nullptr),
+        hash_table_size_(0),
+        mod_bitmask_(0) {}
+
+  virtual ~HashTable() {}
+
+  virtual Status Append(const ArrayData& input) = 0;
+  virtual Status Flush(Datum* out) = 0;
+  virtual Status GetDictionary(std::shared_ptr<ArrayData>* out) = 0;
+
+ protected:
+  Status Init(int64_t elements);
+
+  std::shared_ptr<DataType> type_;
+  MemoryPool* pool_;
+  bool initialized_;
+
+  // The hash table contains integer indices that reference the set of observed
+  // distinct values
+  std::shared_ptr<Buffer> hash_table_;
+  hash_slot_t* hash_slots_;
+
+  /// Size of the table. Must be a power of 2.
+  int64_t hash_table_size_;
+
+  /// Size at which we decide to resize
+  int64_t hash_table_load_threshold_;
+
+  // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j %
+  // hash_table_size_, but uses far fewer CPU cycles
+  int64_t mod_bitmask_;
+};
+
+Status HashTable::Init(int64_t elements) {
+  DCHECK_EQ(elements, BitUtil::NextPower2(elements));
+  RETURN_NOT_OK(NewHashTable(elements, pool_, &hash_table_));
+  hash_slots_ = reinterpret_cast<hash_slot_t*>(hash_table_->mutable_data());
+  hash_table_size_ = elements;
+  hash_table_load_threshold_ =
+      static_cast<int64_t>(static_cast<double>(elements) * kMaxHashTableLoad);
+  mod_bitmask_ = elements - 1;
+  initialized_ = true;
+  return Status::OK();
+}
+
+template <typename Type, typename Action, typename Enable = void>
+class HashTableKernel : public HashTable {};
+
+// Types of hash actions
+//
+// unique: append to dictionary when not found, no-op with slot
+// dictionary-encode: append to dictionary when not found, append slot #
+// match: raise or set null when not found, otherwise append slot #
+// isin: set false when not found, otherwise true
+// value counts: append to dictionary when not found, increment count for slot
+
+template <typename Type, typename Enable = void>
+class HashDictionary {};
+
+// ----------------------------------------------------------------------
+// Hash table pass for nulls
+
+template <typename Type, typename Action>
+class HashTableKernel<Type, Action, enable_if_null<Type>> : public HashTable {
+ public:
+  using HashTable::HashTable;
+
+  Status Init() {
+    // No-op, do not even need to initialize hash table
+    return Status::OK();
+  }
+
+  Status Append(const ArrayData& arr) override {
+    if (!initialized_) {
+      RETURN_NOT_OK(Init());
+    }
+    auto action = static_cast<Action*>(this);
+    RETURN_NOT_OK(action->Reserve(arr.length));
+    for (int64_t i = 0; i < arr.length; ++i) {
+      action->ObserveNull();
+    }
+    return Status::OK();
+  }
+
+  Status GetDictionary(std::shared_ptr<ArrayData>* out) override {
+    // TODO(wesm): handle null being a valid dictionary value
+    auto null_array = std::make_shared<NullArray>(0);
+    *out = null_array->data();
+    return Status::OK();
+  }
+};
+
+// ----------------------------------------------------------------------
+// Hash table pass for primitive types
+
+template <typename Type>
+struct HashDictionary<Type, enable_if_has_c_type<Type>> {
+  using T = typename Type::c_type;
+
+  explicit HashDictionary(MemoryPool* pool)
+      : pool(pool), buffer(std::make_shared<PoolBuffer>(pool)), size(0), capacity(0) {}
+
+  Status Init() {
+    this->size = 0;
+    return Resize(kInitialHashTableSize);
+  }
+
+  Status DoubleSize() { return Resize(this->size * 2); }
+
+  Status Resize(const int64_t elements) {
+    RETURN_NOT_OK(this->buffer->Resize(elements * sizeof(T)));
+
+    this->capacity = elements;
+    this->values = reinterpret_cast<T*>(this->buffer->mutable_data());
+    return Status::OK();
+  }
+
+  MemoryPool* pool;
+  std::shared_ptr<ResizableBuffer> buffer;
+  T* values;
+  int64_t size;
+  int64_t capacity;
+};
+
+#define GENERIC_HASH_PASS(HASH_INNER_LOOP)                                               \
+  if (arr.null_count != 0) {                                                             \
+    internal::BitmapReader valid_reader(arr.buffers[0]->data(), arr.offset, arr.length); \
+    for (int64_t i = 0; i < arr.length; ++i) {                                           \
+      const bool is_null = valid_reader.IsNotSet();                                      \
+      valid_reader.Next();                                                               \
+                                                                                         \
+      if (is_null) {                                                                     \
+        action->ObserveNull();                                                           \
+        continue;                                                                        \
+      }                                                                                  \
+                                                                                         \
+      HASH_INNER_LOOP();                                                                 \
+    }                                                                                    \
+  } else {                                                                               \
+    for (int64_t i = 0; i < arr.length; ++i) {                                           \
+      HASH_INNER_LOOP();                                                                 \
+    }                                                                                    \
+  }
+
+#define DOUBLE_TABLE_SIZE(SETUP_CODE, COMPUTE_HASH)                              \
+  do {                                                                           \
+    int64_t new_size = hash_table_size_ * 2;                                     \
+                                                                                 \
+    std::shared_ptr<Buffer> new_hash_table;                                      \
+    RETURN_NOT_OK(NewHashTable(new_size, pool_, &new_hash_table));               \
+    int32_t* new_hash_slots =                                                    \
+        reinterpret_cast<hash_slot_t*>(new_hash_table->mutable_data());          \
+    int64_t new_mod_bitmask = new_size - 1;                                      \
+                                                                                 \
+    SETUP_CODE;                                                                  \
+                                                                                 \
+    for (int i = 0; i < hash_table_size_; ++i) {                                 \
+      hash_slot_t index = hash_slots_[i];                                        \
+                                                                                 \
+      if (index == kHashSlotEmpty) {                                             \
+        continue;                                                                \
+      }                                                                          \
+                                                                                 \
+      COMPUTE_HASH;                                                              \
+      while (kHashSlotEmpty != new_hash_slots[j]) {                              \
+        ++j;                                                                     \
+        if (ARROW_PREDICT_FALSE(j == hash_table_size_)) {                        \
+          j = 0;                                                                 \
+        }                                                                        \
+      }                                                                          \
+                                                                                 \
+      new_hash_slots[j] = index;                                                 \
+    }                                                                            \
+                                                                                 \
+    hash_table_ = new_hash_table;                                                \
+    hash_slots_ = reinterpret_cast<hash_slot_t*>(hash_table_->mutable_data());   \
+    hash_table_size_ = new_size;                                                 \
+    hash_table_load_threshold_ =                                                 \
+        static_cast<int64_t>(static_cast<double>(new_size) * kMaxHashTableLoad); \
+    mod_bitmask_ = new_size - 1;                                                 \
+  } while (false)
+
+template <typename Type, typename Action>
+class HashTableKernel<Type, Action, enable_if_has_c_type<Type>> : public HashTable {
+ public:
+  using T = typename Type::c_type;
+
+  HashTableKernel(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+      : HashTable(type, pool), dict_(pool) {}
+
+  Status Init() {
+    RETURN_NOT_OK(dict_.Init());
+    return HashTable::Init(kInitialHashTableSize);
+  }
+
+  Status Append(const ArrayData& arr) override {
+    if (!initialized_) {
+      RETURN_NOT_OK(Init());
+    }
+
+    const T* values = GetValues<T>(arr, 1);
+    auto action = static_cast<Action*>(this);
+
+    RETURN_NOT_OK(action->Reserve(arr.length));
+
+#define HASH_INNER_LOOP()                                               \
+  const T value = values[i];                                            \
+  int64_t j = HashValue(value) & mod_bitmask_;                          \
+  hash_slot_t slot = hash_slots_[j];                                    \
+                                                                        \
+  while (kHashSlotEmpty != slot && dict_.values[slot] != value) {       \
+    ++j;                                                                \
+    if (ARROW_PREDICT_FALSE(j == hash_table_size_)) {                   \
+      j = 0;                                                            \
+    }                                                                   \
+    slot = hash_slots_[j];                                              \
+  }                                                                     \
+                                                                        \
+  if (slot == kHashSlotEmpty) {                                         \
+    if (!Action::allow_expand) {                                        \
+      throw HashException("Encountered new dictionary value");          \
+    }                                                                   \
+                                                                        \
+    slot = static_cast<hash_slot_t>(dict_.size);                        \
+    hash_slots_[j] = slot;                                              \
+    dict_.values[dict_.size++] = value;                                 \
+                                                                        \
+    action->ObserveNotFound(slot);                                      \
+                                                                        \
+    if (ARROW_PREDICT_FALSE(dict_.size > hash_table_load_threshold_)) { \
+      RETURN_NOT_OK(action->DoubleSize());                              \
+    }                                                                   \
+  } else {                                                              \
+    action->ObserveFound(slot);                                         \
+  }
+
+    GENERIC_HASH_PASS(HASH_INNER_LOOP);
+
+#undef HASH_INNER_LOOP
+
+    return Status::OK();
+  }
+
+  Status GetDictionary(std::shared_ptr<ArrayData>* out) override {
+    // TODO(wesm): handle null being in the dictionary
+    auto dict_data = dict_.buffer;
+    RETURN_NOT_OK(dict_data->Resize(dict_.size * sizeof(T), false));
+
+    BufferVector buffers = {nullptr, dict_data};
+    *out = std::make_shared<ArrayData>(type_, dict_.size, std::move(buffers), 0);
+    return Status::OK();
+  }
+
+ protected:
+  int64_t HashValue(const T& value) const {
+    // TODO(wesm): Use faster hash function for C types
+    return HashUtil::Hash(&value, sizeof(T), 0);
+  }
+
+  Status DoubleTableSize() {
+#define PRIMITIVE_INNER_LOOP           \
+  const T value = dict_.values[index]; \
+  int64_t j = HashValue(value) & new_mod_bitmask;
+
+    DOUBLE_TABLE_SIZE(, PRIMITIVE_INNER_LOOP);
+
+#undef PRIMITIVE_INNER_LOOP
+
+    return dict_.Resize(hash_table_size_);
+  }
+
+  HashDictionary<Type> dict_;
+};
+
+// ----------------------------------------------------------------------
+// Hash table pass for variable-length binary types
+
+template <typename Type, typename Action>
+class HashTableKernel<Type, Action, enable_if_binary<Type>> : public HashTable {
+ public:
+  HashTableKernel(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+      : HashTable(type, pool), dict_offsets_(pool), dict_data_(pool), dict_size_(0) {}
+
+  Status Init() {
+    RETURN_NOT_OK(dict_offsets_.Resize(kInitialHashTableSize));
+
+    // We append the end offset after each append to the dictionary, so this
+    // sets the initial condition for the length-0 case
+    //
+    // initial offsets (dict size == 0): 0
+    // after 1st dict entry of length 3: 0 3
+    // after 2nd dict entry of length 4: 0 3 7
+    RETURN_NOT_OK(dict_offsets_.Append(0));
+    return HashTable::Init(kInitialHashTableSize);
+  }
+
+  Status Append(const ArrayData& arr) override {
+    if (!initialized_) {
+      RETURN_NOT_OK(Init());
+    }
+
+    const int32_t* offsets = GetValues<int32_t>(arr, 1);
+    const uint8_t* data = GetValues<uint8_t>(arr, 2);
+
+    auto action = static_cast<Action*>(this);
+    RETURN_NOT_OK(action->Reserve(arr.length));
+
+#define HASH_INNER_LOOP()                                                           \
+  const int32_t position = offsets[i];                                              \
+  const int32_t length = offsets[i + 1] - position;                                 \
+  const uint8_t* value = data + position;                                           \
+                                                                                    \
+  int64_t j = HashValue(value, length) & mod_bitmask_;                              \
+  hash_slot_t slot = hash_slots_[j];                                                \
+                                                                                    \
+  const int32_t* dict_offsets = dict_offsets_.data();                               \
+  const uint8_t* dict_data = dict_data_.data();                                     \
+  while (kHashSlotEmpty != slot &&                                                  \
+         !((dict_offsets[slot + 1] - dict_offsets[slot]) == length &&               \
+           0 == memcmp(value, dict_data + dict_offsets[slot], length))) {           \
+    ++j;                                                                            \
+    if (ARROW_PREDICT_FALSE(j == hash_table_size_)) {                               \
+      j = 0;                                                                        \
+    }                                                                               \
+    slot = hash_slots_[j];                                                          \
+  }                                                                                 \
+                                                                                    \
+  if (slot == kHashSlotEmpty) {                                                     \
+    if (!Action::allow_expand) {                                                    \
+      throw HashException("Encountered new dictionary value");                      \
+    }                                                                               \
+                                                                                    \
+    slot = dict_size_++;                                                            \
+    hash_slots_[j] = slot;                                                          \
+                                                                                    \
+    RETURN_NOT_OK(dict_data_.Append(value, length));                                \
+    RETURN_NOT_OK(dict_offsets_.Append(static_cast<int32_t>(dict_data_.length()))); \
+                                                                                    \
+    action->ObserveNotFound(slot);                                                  \
+                                                                                    \
+    if (ARROW_PREDICT_FALSE(dict_size_ > hash_table_load_threshold_)) {             \
+      RETURN_NOT_OK(action->DoubleSize());                                          \
+    }                                                                               \
+  } else {                                                                          \
+    action->ObserveFound(slot);                                                     \
+  }
+
+    GENERIC_HASH_PASS(HASH_INNER_LOOP);
+
+#undef HASH_INNER_LOOP
+
+    return Status::OK();
+  }
+
+  Status GetDictionary(std::shared_ptr<ArrayData>* out) override {
+    // TODO(wesm): handle null being in the dictionary
+    BufferVector buffers = {nullptr, nullptr, nullptr};
+
+    RETURN_NOT_OK(dict_offsets_.Finish(&buffers[1]));
+    RETURN_NOT_OK(dict_data_.Finish(&buffers[2]));
+
+    *out = std::make_shared<ArrayData>(type_, dict_size_, std::move(buffers), 0);
+    return Status::OK();
+  }
+
+ protected:
+  int64_t HashValue(const uint8_t* data, int32_t length) const {
+    return HashUtil::Hash(data, length, 0);
+  }
+
+  Status DoubleTableSize() {
+#define VARBYTES_SETUP                                \
+  const int32_t* dict_offsets = dict_offsets_.data(); \
+  const uint8_t* dict_data = dict_data_.data()
+
+#define VARBYTES_COMPUTE_HASH                                           \
+  const int32_t length = dict_offsets[index + 1] - dict_offsets[index]; \
+  const uint8_t* value = dict_data + dict_offsets[index];               \
+  int64_t j = HashValue(value, length) & new_mod_bitmask
+
+    DOUBLE_TABLE_SIZE(VARBYTES_SETUP, VARBYTES_COMPUTE_HASH);
+
+#undef VARBYTES_SETUP
+#undef VARBYTES_COMPUTE_HASH
+
+    return Status::OK();
+  }
+
+  TypedBufferBuilder<int32_t> dict_offsets_;
+  TypedBufferBuilder<uint8_t> dict_data_;
+  int32_t dict_size_;
+};
+
+// ----------------------------------------------------------------------
+// Hash table pass for fixed size binary types
+
+template <typename Type, typename Action>
+class HashTableKernel<Type, Action, enable_if_fixed_size_binary<Type>>
+    : public HashTable {
+ public:
+  HashTableKernel(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+      : HashTable(type, pool), dict_data_(pool), dict_size_(0) {
+    const auto& fw_type = static_cast<const FixedSizeBinaryType&>(*type);
+    byte_width_ = fw_type.bit_width() / 8;
+  }
+
+  Status Init() {
+    RETURN_NOT_OK(dict_data_.Resize(kInitialHashTableSize * byte_width_));
+    return HashTable::Init(kInitialHashTableSize);
+  }
+
+  Status Append(const ArrayData& arr) override {
+    if (!initialized_) {
+      RETURN_NOT_OK(Init());
+    }
+
+    const uint8_t* data = GetValues<uint8_t>(arr, 1);
+
+    auto action = static_cast<Action*>(this);
+    RETURN_NOT_OK(action->Reserve(arr.length));
+
+#define HASH_INNER_LOOP()                                                      \
+  const uint8_t* value = data + i * byte_width_;                               \
+  int64_t j = HashValue(value) & mod_bitmask_;                                 \
+  hash_slot_t slot = hash_slots_[j];                                           \
+                                                                               \
+  const uint8_t* dict_data = dict_data_.data();                                \
+  while (kHashSlotEmpty != slot &&                                             \
+         !(0 == memcmp(value, dict_data + slot * byte_width_, byte_width_))) { \
+    ++j;                                                                       \
+    if (ARROW_PREDICT_FALSE(j == hash_table_size_)) {                          \
+      j = 0;                                                                   \
+    }                                                                          \
+    slot = hash_slots_[j];                                                     \
+  }                                                                            \
+                                                                               \
+  if (slot == kHashSlotEmpty) {                                                \
+    if (!Action::allow_expand) {                                               \
+      throw HashException("Encountered new dictionary value");                 \
+    }                                                                          \
+                                                                               \
+    slot = dict_size_++;                                                       \
+    hash_slots_[j] = slot;                                                     \
+                                                                               \
+    RETURN_NOT_OK(dict_data_.Append(value, byte_width_));                      \
+                                                                               \
+    action->ObserveNotFound(slot);                                             \
+                                                                               \
+    if (ARROW_PREDICT_FALSE(dict_size_ > hash_table_load_threshold_)) {        \
+      RETURN_NOT_OK(action->DoubleSize());                                     \
+    }                                                                          \
+  } else {                                                                     \
+    action->ObserveFound(slot);                                                \
+  }
+
+    GENERIC_HASH_PASS(HASH_INNER_LOOP);
+
+#undef HASH_INNER_LOOP
+
+    return Status::OK();
+  }
+
+  Status GetDictionary(std::shared_ptr<ArrayData>* out) override {
+    // TODO(wesm): handle null being in the dictionary
+    BufferVector buffers = {nullptr, nullptr};
+    RETURN_NOT_OK(dict_data_.Finish(&buffers[1]));
+
+    *out = std::make_shared<ArrayData>(type_, dict_size_, std::move(buffers), 0);
+    return Status::OK();
+  }
+
+ protected:
+  int64_t HashValue(const uint8_t* data) const {
+    return HashUtil::Hash(data, byte_width_, 0);
+  }
+
+  Status DoubleTableSize() {
+#define FIXED_BYTES_SETUP const uint8_t* dict_data = dict_data_.data()
+
+#define FIXED_BYTES_COMPUTE_HASH \
+  int64_t j = HashValue(dict_data + index * byte_width_) & new_mod_bitmask
+
+    DOUBLE_TABLE_SIZE(FIXED_BYTES_SETUP, FIXED_BYTES_COMPUTE_HASH);
+
+#undef FIXED_BYTES_SETUP
+#undef FIXED_BYTES_COMPUTE_HASH
+
+    return Status::OK();
+  }
+
+  int32_t byte_width_;
+  TypedBufferBuilder<uint8_t> dict_data_;
+  int32_t dict_size_;
+};
+
+// ----------------------------------------------------------------------
+// Unique implementation
+
+template <typename Type>
+class UniqueImpl : public HashTableKernel<Type, UniqueImpl<Type>> {
+ public:
+  static constexpr bool allow_expand = true;
+  using Base = HashTableKernel<Type, UniqueImpl<Type>>;
+  using Base::Base;
+
+  Status Reserve(const int64_t length) { return Status::OK(); }
+
+  void ObserveFound(const hash_slot_t slot) {}
+  void ObserveNull() {}
+  void ObserveNotFound(const hash_slot_t slot) {}
+
+  Status DoubleSize() { return Base::DoubleTableSize(); }
+
+  Status Append(const ArrayData& input) override { return Base::Append(input); }
+
+  Status Flush(Datum* out) override {
+    // No-op
+    return Status::OK();
+  }
+};
+
+// ----------------------------------------------------------------------
+// Dictionary encode implementation
+
+template <typename Type>
+class DictEncodeImpl : public HashTableKernel<Type, DictEncodeImpl<Type>> {
+ public:
+  static constexpr bool allow_expand = true;
+  using Base = HashTableKernel<Type, DictEncodeImpl>;
+
+  DictEncodeImpl(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+      : Base(type, pool), indices_builder_(pool) {}
+
+  Status Reserve(const int64_t length) { return indices_builder_.Reserve(length); }
+
+  void ObserveNull() { indices_builder_.UnsafeAppendToBitmap(false); }
+
+  void ObserveFound(const hash_slot_t slot) { indices_builder_.UnsafeAppend(slot); }
+
+  void ObserveNotFound(const hash_slot_t slot) { return ObserveFound(slot); }
+
+  Status DoubleSize() { return Base::DoubleTableSize(); }
+
+  Status Flush(Datum* out) override {
+    std::shared_ptr<ArrayData> result;
+    RETURN_NOT_OK(indices_builder_.FinishInternal(&result));
+    out->value = std::move(result);
+    return Status::OK();
+  }
+
+  using Base::Append;
+
+ private:
+  Int32Builder indices_builder_;
+};
+
+// ----------------------------------------------------------------------
+// Kernel wrapper for generic hash table kernels
+
+class HashKernelImpl : public HashKernel {
+ public:
+  explicit HashKernelImpl(std::unique_ptr<HashTable> hasher)
+      : hasher_(std::move(hasher)) {}
+
+  Status Call(FunctionContext* ctx, const ArrayData& input, Datum* out) override {
+    RETURN_NOT_OK(Append(ctx, input));
+    return Flush(out);
+  }
+
+  Status Append(FunctionContext* ctx, const ArrayData& input) override {
+    std::lock_guard<std::mutex> guard(lock_);
+    try {
+      RETURN_NOT_OK(hasher_->Append(input));
+    } catch (const HashException& e) {
+      return Status(e.code(), e.what());
+    }
+    return Status::OK();
+  }
+
+  Status Flush(Datum* out) override { return hasher_->Flush(out); }
+
+  Status GetDictionary(std::shared_ptr<ArrayData>* out) override {
+    return hasher_->GetDictionary(out);
+  }
+
+ private:
+  std::mutex lock_;
+  std::unique_ptr<HashTable> hasher_;
+};
+
+}  // namespace
+
+Status GetUniqueKernel(FunctionContext* ctx, const std::shared_ptr<DataType>& type,
+                       std::unique_ptr<HashKernel>* out) {
+  std::unique_ptr<HashTable> hasher;
+
+#define UNIQUE_CASE(InType)                                         \
+  case InType::type_id:                                             \
+    hasher.reset(new UniqueImpl<InType>(type, ctx->memory_pool())); \
+    break
+
+  switch (type->id()) {
+    UNIQUE_CASE(NullType);
+    // UNIQUE_CASE(BooleanType);
+    UNIQUE_CASE(UInt8Type);
+    UNIQUE_CASE(Int8Type);
+    UNIQUE_CASE(UInt16Type);
+    UNIQUE_CASE(Int16Type);
+    UNIQUE_CASE(UInt32Type);
+    UNIQUE_CASE(Int32Type);
+    UNIQUE_CASE(UInt64Type);
+    UNIQUE_CASE(Int64Type);
+    UNIQUE_CASE(FloatType);
+    UNIQUE_CASE(DoubleType);
+    UNIQUE_CASE(Date32Type);
+    UNIQUE_CASE(Date64Type);
+    UNIQUE_CASE(Time32Type);
+    UNIQUE_CASE(Time64Type);
+    UNIQUE_CASE(TimestampType);
+    UNIQUE_CASE(BinaryType);
+    UNIQUE_CASE(StringType);
+    UNIQUE_CASE(FixedSizeBinaryType);
+    UNIQUE_CASE(Decimal128Type);
+    default:
+      break;
+  }
+
+#undef UNIQUE_CASE
+
+  CHECK_IMPLEMENTED(hasher, "unique", type);
+  out->reset(new HashKernelImpl(std::move(hasher)));
+  return Status::OK();
+}
+
+Status GetDictionaryEncodeKernel(FunctionContext* ctx,
+                                 const std::shared_ptr<DataType>& type,
+                                 std::unique_ptr<HashKernel>* out) {
+  std::unique_ptr<HashTable> hasher;
+
+#define DICTIONARY_ENCODE_CASE(InType)                                  \
+  case InType::type_id:                                                 \
+    hasher.reset(new DictEncodeImpl<InType>(type, ctx->memory_pool())); \
+    break
+
+  switch (type->id()) {
+    DICTIONARY_ENCODE_CASE(NullType);
+    // DICTIONARY_ENCODE_CASE(BooleanType);
+    DICTIONARY_ENCODE_CASE(UInt8Type);
+    DICTIONARY_ENCODE_CASE(Int8Type);
+    DICTIONARY_ENCODE_CASE(UInt16Type);
+    DICTIONARY_ENCODE_CASE(Int16Type);
+    DICTIONARY_ENCODE_CASE(UInt32Type);
+    DICTIONARY_ENCODE_CASE(Int32Type);
+    DICTIONARY_ENCODE_CASE(UInt64Type);
+    DICTIONARY_ENCODE_CASE(Int64Type);
+    DICTIONARY_ENCODE_CASE(FloatType);
+    DICTIONARY_ENCODE_CASE(DoubleType);
+    DICTIONARY_ENCODE_CASE(Date32Type);
+    DICTIONARY_ENCODE_CASE(Date64Type);
+    DICTIONARY_ENCODE_CASE(Time32Type);
+    DICTIONARY_ENCODE_CASE(Time64Type);
+    DICTIONARY_ENCODE_CASE(TimestampType);
+    DICTIONARY_ENCODE_CASE(BinaryType);
+    DICTIONARY_ENCODE_CASE(StringType);
+    DICTIONARY_ENCODE_CASE(FixedSizeBinaryType);
+    DICTIONARY_ENCODE_CASE(Decimal128Type);
+    default:
+      break;
+  }
+
+#undef DICTIONARY_ENCODE_CASE
+
+  CHECK_IMPLEMENTED(hasher, "dictionary-encode", type);
+  out->reset(new HashKernelImpl(std::move(hasher)));
+  return Status::OK();
+}
+
+namespace {
+
+Status InvokeHash(FunctionContext* ctx, HashKernel* func, const Datum& value,
+                  std::vector<Datum>* kernel_outputs,
+                  std::shared_ptr<Array>* dictionary) {
+  RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, func, value, kernel_outputs));
+
+  std::shared_ptr<ArrayData> dict_data;
+  RETURN_NOT_OK(func->GetDictionary(&dict_data));
+  *dictionary = MakeArray(dict_data);
+  return Status::OK();
+}
+
+}  // namespace
+
+Status Unique(FunctionContext* ctx, const Datum& value, std::shared_ptr<Array>* out) {
+  std::unique_ptr<HashKernel> func;
+  RETURN_NOT_OK(GetUniqueKernel(ctx, value.type(), &func));
+
+  std::vector<Datum> dummy_outputs;
+  return InvokeHash(ctx, func.get(), value, &dummy_outputs, out);
+}
+
+Status DictionaryEncode(FunctionContext* ctx, const Datum& value, Datum* out) {
+  std::unique_ptr<HashKernel> func;
+  RETURN_NOT_OK(GetDictionaryEncodeKernel(ctx, value.type(), &func));
+
+  std::shared_ptr<Array> dictionary;
+  std::vector<Datum> indices_outputs;
+  RETURN_NOT_OK(InvokeHash(ctx, func.get(), value, &indices_outputs, &dictionary));
+
+  // Create the dictionary type
+  DCHECK_EQ(indices_outputs[0].kind(), Datum::ARRAY);
+  std::shared_ptr<DataType> dict_type =
+      ::arrow::dictionary(indices_outputs[0].array()->type, dictionary);
+
+  // Create DictionaryArray for each piece yielded by the kernel invocations
+  std::vector<std::shared_ptr<Array>> dict_chunks;
+  for (const Datum& datum : indices_outputs) {
+    dict_chunks.emplace_back(
+        std::make_shared<DictionaryArray>(dict_type, MakeArray(datum.array())));
+  }
+
+  *out = detail::WrapArraysLike(value, dict_chunks);
+  return Status::OK();
+}
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/hash.h b/cpp/src/arrow/compute/kernels/hash.h
new file mode 100644
index 0000000..05f2429
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/hash.h
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_COMPUTE_KERNELS_HASH_H
+#define ARROW_COMPUTE_KERNELS_HASH_H
+
+#include <memory>
+#include <vector>
+
+#include "arrow/compute/kernel.h"
+#include "arrow/status.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace compute {
+
+class FunctionContext;
+
+/// \brief Invoke hash table kernel on input array, returning any output
+/// values. Implementations should be thread-safe
+class ARROW_EXPORT HashKernel : public UnaryKernel {
+ public:
+  virtual Status Append(FunctionContext* ctx, const ArrayData& input) = 0;
+  virtual Status Flush(Datum* out) = 0;
+  virtual Status GetDictionary(std::shared_ptr<ArrayData>* out) = 0;
+};
+
+/// \since 0.8.0
+/// \note API not yet finalized
+ARROW_EXPORT
+Status GetUniqueKernel(FunctionContext* ctx, const std::shared_ptr<DataType>& type,
+                       std::unique_ptr<HashKernel>* kernel);
+
+ARROW_EXPORT
+Status GetDictionaryEncodeKernel(FunctionContext* ctx,
+                                 const std::shared_ptr<DataType>& type,
+                                 std::unique_ptr<HashKernel>* kernel);
+
+/// \brief Compute unique elements from an array-like object
+/// \param[in] context the FunctionContext
+/// \param[in] datum array-like input
+/// \param[out] out result as Array
+///
+/// \since 0.8.0
+/// \note API not yet finalized
+ARROW_EXPORT
+Status Unique(FunctionContext* context, const Datum& datum, std::shared_ptr<Array>* out);
+
+/// \brief Dictionary-encode values in an array-like object
+/// \param[in] context the FunctionContext
+/// \param[in] data array-like input
+/// \param[out] out result with same shape and type as input
+///
+/// \since 0.8.0
+/// \note API not yet finalized
+ARROW_EXPORT
+Status DictionaryEncode(FunctionContext* context, const Datum& data, Datum* out);
+
+// TODO(wesm): Define API for incremental dictionary encoding
+
+// TODO(wesm): Define API for regularizing DictionaryArray objects with
+// different dictionaries
+
+// class DictionaryEncoder {
+//  public:
+//   virtual Encode(const Datum& data, Datum* out) = 0;
+// };
+
+//
+// ARROW_EXPORT
+// Status DictionaryEncode(FunctionContext* context, const Datum& data,
+//                         const Array& prior_dictionary, Datum* out);
+
+// TODO(wesm): Implement these next
+// ARROW_EXPORT
+// Status Match(FunctionContext* context, const Datum& values, const Datum& member_set,
+//              Datum* out);
+
+// ARROW_EXPORT
+// Status IsIn(FunctionContext* context, const Datum& values, const Datum& member_set,
+//             Datum* out);
+
+// ARROW_EXPORT
+// Status CountValues(FunctionContext* context, const Datum& values,
+//                    std::shared_ptr<Array>* out_uniques,
+//                    std::shared_ptr<Array>* out_counts);
+
+}  // namespace compute
+}  // namespace arrow
+
+#endif  // ARROW_COMPUTE_KERNELS_HASH_H
diff --git a/cpp/src/arrow/compute/kernels/util-internal.cc b/cpp/src/arrow/compute/kernels/util-internal.cc
new file mode 100644
index 0000000..df68637
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/util-internal.cc
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/kernels/util-internal.h"
+
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/compute/context.h"
+#include "arrow/compute/kernel.h"
+
+namespace arrow {
+namespace compute {
+namespace detail {
+
+Status InvokeUnaryArrayKernel(FunctionContext* ctx, UnaryKernel* kernel,
+                              const Datum& value, std::vector<Datum>* outputs) {
+  if (value.kind() == Datum::ARRAY) {
+    Datum output;
+    RETURN_NOT_OK(kernel->Call(ctx, *value.array(), &output));
+    outputs->push_back(output);
+  } else if (value.kind() == Datum::CHUNKED_ARRAY) {
+    const ChunkedArray& array = *value.chunked_array();
+    for (int i = 0; i < array.num_chunks(); i++) {
+      Datum output;
+      RETURN_NOT_OK(kernel->Call(ctx, *(array.chunk(i)->data()), &output));
+      outputs->push_back(output);
+    }
+  } else {
+    return Status::Invalid("Input Datum was not array-like");
+  }
+  return Status::OK();
+}
+
+Datum WrapArraysLike(const Datum& value,
+                     const std::vector<std::shared_ptr<Array>>& arrays) {
+  // Create right kind of datum
+  if (value.kind() == Datum::ARRAY) {
+    return Datum(arrays[0]->data());
+  } else if (value.kind() == Datum::CHUNKED_ARRAY) {
+    return Datum(std::make_shared<ChunkedArray>(arrays));
+  } else {
+    DCHECK(false) << "unhandled datum kind";
+    return Datum();
+  }
+}
+
+Datum WrapDatumsLike(const Datum& value, const std::vector<Datum>& datums) {
+  // Create right kind of datum
+  if (value.kind() == Datum::ARRAY) {
+    DCHECK_EQ(1, datums.size());
+    return Datum(datums[0].array());
+  } else if (value.kind() == Datum::CHUNKED_ARRAY) {
+    std::vector<std::shared_ptr<Array>> arrays;
+    for (const Datum& datum : datums) {
+      DCHECK_EQ(Datum::ARRAY, datum.kind());
+      arrays.emplace_back(MakeArray(datum.array()));
+    }
+    return Datum(std::make_shared<ChunkedArray>(arrays));
+  } else {
+    DCHECK(false) << "unhandled datum kind";
+    return Datum();
+  }
+}
+
+}  // namespace detail
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/util-internal.h b/cpp/src/arrow/compute/kernels/util-internal.h
new file mode 100644
index 0000000..70c5062
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/util-internal.h
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_COMPUTE_KERNELS_UTIL_INTERNAL_H
+#define ARROW_COMPUTE_KERNELS_UTIL_INTERNAL_H
+
+#include <vector>
+
+#include "arrow/compute/kernel.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow {
+namespace compute {
+
+class FunctionContext;
+
+template <typename T>
+using is_number = std::is_base_of<Number, T>;
+
+template <typename T>
+using enable_if_primitive_ctype =
+    typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value>::type;
+
+template <typename T>
+using enable_if_date = typename std::enable_if<std::is_base_of<DateType, T>::value>::type;
+
+template <typename T>
+using enable_if_time = typename std::enable_if<std::is_base_of<TimeType, T>::value>::type;
+
+template <typename T>
+using enable_if_timestamp =
+    typename std::enable_if<std::is_base_of<TimestampType, T>::value>::type;
+
+template <typename T>
+using enable_if_has_c_type =
+    typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value ||
+                            std::is_base_of<DateType, T>::value ||
+                            std::is_base_of<TimeType, T>::value ||
+                            std::is_base_of<TimestampType, T>::value>::type;
+
+template <typename T>
+using enable_if_null = typename std::enable_if<std::is_same<NullType, T>::value>::type;
+
+template <typename T>
+using enable_if_binary =
+    typename std::enable_if<std::is_base_of<BinaryType, T>::value>::type;
+
+template <typename T>
+using enable_if_fixed_size_binary =
+    typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T>::value>::type;
+
+template <typename T>
+using enable_if_list = typename std::enable_if<std::is_base_of<ListType, T>::value>::type;
+
+template <typename T>
+using enable_if_number = typename std::enable_if<is_number<T>::value>::type;
+
+template <typename T>
+inline const T* GetValues(const ArrayData& data, int i) {
+  return reinterpret_cast<const T*>(data.buffers[i]->data()) + data.offset;
+}
+
+template <typename T>
+inline T* GetMutableValues(const ArrayData* data, int i) {
+  return reinterpret_cast<T*>(data->buffers[i]->mutable_data()) + data->offset;
+}
+
+static inline void CopyData(const ArrayData& input, ArrayData* output) {
+  output->length = input.length;
+  output->null_count = input.null_count;
+  output->buffers = input.buffers;
+  output->offset = input.offset;
+  output->child_data = input.child_data;
+}
+
+namespace detail {
+
+Status InvokeUnaryArrayKernel(FunctionContext* ctx, UnaryKernel* kernel,
+                              const Datum& value, std::vector<Datum>* outputs);
+
+Datum WrapArraysLike(const Datum& value,
+                     const std::vector<std::shared_ptr<Array>>& arrays);
+
+Datum WrapDatumsLike(const Datum& value, const std::vector<Datum>& datums);
+
+}  // namespace detail
+
+}  // namespace compute
+}  // namespace arrow
+
+#endif  // ARROW_COMPUTE_KERNELS_UTIL_INTERNAL_H
diff --git a/cpp/src/arrow/pretty_print-test.cc b/cpp/src/arrow/pretty_print-test.cc
index 8b9a24f..bf29d6a 100644
--- a/cpp/src/arrow/pretty_print-test.cc
+++ b/cpp/src/arrow/pretty_print-test.cc
@@ -107,14 +107,13 @@ TEST_F(TestPrettyPrint, FixedSizeBinaryType) {
   CheckArray(*array, 0, ex);
 }
 
-TEST_F(TestPrettyPrint, DecimalType) {
+TEST_F(TestPrettyPrint, Decimal128Type) {
   int32_t p = 19;
   int32_t s = 4;
 
   auto type = decimal(p, s);
 
-  DecimalBuilder builder(type);
-
+  Decimal128Builder builder(type);
   Decimal128 val;
 
   ASSERT_OK(Decimal128::FromString("123.4567", &val));
diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc b/cpp/src/arrow/python/arrow_to_pandas.cc
index 2b0f964..8814fc1 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.cc
+++ b/cpp/src/arrow/python/arrow_to_pandas.cc
@@ -42,6 +42,8 @@
 #include "arrow/util/parallel.h"
 #include "arrow/visitor_inline.h"
 
+#include "arrow/compute/api.h"
+
 #include "arrow/python/builtin_convert.h"
 #include "arrow/python/common.h"
 #include "arrow/python/config.h"
@@ -57,6 +59,8 @@ namespace py {
 using internal::kPandasTimestampNull;
 using internal::kNanosecondsInDay;
 
+using compute::Datum;
+
 // ----------------------------------------------------------------------
 // Utility code
 
@@ -1028,8 +1032,13 @@ class CategoricalBlock : public PandasBlock {
     std::shared_ptr<Column> converted_col;
     if (options_.strings_to_categorical &&
         (col->type()->id() == Type::STRING || col->type()->id() == Type::BINARY)) {
-      RETURN_NOT_OK(EncodeColumnToDictionary(static_cast<const Column&>(*col), pool_,
-                                             &converted_col));
+      compute::FunctionContext ctx(pool_);
+
+      Datum out;
+      RETURN_NOT_OK(compute::DictionaryEncode(&ctx, Datum(col->data()), &out));
+      DCHECK_EQ(out.kind(), Datum::CHUNKED_ARRAY);
+      converted_col =
+          std::make_shared<Column>(field(col->name(), out.type()), out.chunked_array());
     } else {
       converted_col = col;
     }
@@ -1646,7 +1655,7 @@ class ArrowDeserializer {
       CONVERTVALUES_LISTSLIKE_CASE(FloatType, FLOAT)
       CONVERTVALUES_LISTSLIKE_CASE(DoubleType, DOUBLE)
       CONVERTVALUES_LISTSLIKE_CASE(StringType, STRING)
-      CONVERTVALUES_LISTSLIKE_CASE(DecimalType, DECIMAL)
+      CONVERTVALUES_LISTSLIKE_CASE(Decimal128Type, DECIMAL)
       CONVERTVALUES_LISTSLIKE_CASE(ListType, LIST)
       default: {
         std::stringstream ss;
diff --git a/cpp/src/arrow/python/numpy_to_arrow.cc b/cpp/src/arrow/python/numpy_to_arrow.cc
index a6c28af..0d2df93 100644
--- a/cpp/src/arrow/python/numpy_to_arrow.cc
+++ b/cpp/src/arrow/python/numpy_to_arrow.cc
@@ -42,8 +42,8 @@
 #include "arrow/util/macros.h"
 #include "arrow/visitor_inline.h"
 
-#include "arrow/compute/cast.h"
 #include "arrow/compute/context.h"
+#include "arrow/compute/kernels/cast.h"
 
 #include "arrow/python/builtin_convert.h"
 #include "arrow/python/common.h"
@@ -466,13 +466,14 @@ Status NumPyConverter::Convert() {
 
 namespace {
 
-Status CastBuffer(const std::shared_ptr<Buffer>& input, const int64_t length,
-                  const std::shared_ptr<DataType>& in_type,
+Status CastBuffer(const std::shared_ptr<DataType>& in_type,
+                  const std::shared_ptr<Buffer>& input, const int64_t length,
+                  const std::shared_ptr<Buffer>& valid_bitmap, const int64_t null_count,
                   const std::shared_ptr<DataType>& out_type, MemoryPool* pool,
                   std::shared_ptr<Buffer>* out) {
   // Must cast
-  std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, input};
-  auto tmp_data = std::make_shared<ArrayData>(in_type, length, buffers, 0);
+  std::vector<std::shared_ptr<Buffer>> buffers = {valid_bitmap, input};
+  auto tmp_data = std::make_shared<ArrayData>(in_type, length, buffers, null_count);
 
   std::shared_ptr<Array> tmp_array = MakeArray(tmp_data);
   std::shared_ptr<Array> casted_array;
@@ -488,6 +489,21 @@ Status CastBuffer(const std::shared_ptr<Buffer>& input, const int64_t length,
   return Status::OK();
 }
 
+template <typename FromType, typename ToType>
+Status StaticCastBuffer(const Buffer& input, const int64_t length, MemoryPool* pool,
+                        std::shared_ptr<Buffer>* out) {
+  auto result = std::make_shared<PoolBuffer>(pool);
+  RETURN_NOT_OK(result->Resize(sizeof(ToType) * length));
+
+  auto in_values = reinterpret_cast<const FromType*>(input.data());
+  auto out_values = reinterpret_cast<ToType*>(result->mutable_data());
+  for (int64_t i = 0; i < length; ++i) {
+    *out_values++ = static_cast<ToType>(*in_values++);
+  }
+  *out = result;
+  return Status::OK();
+}
+
 template <typename T, typename T2>
 void CopyStrided(T* input_data, int64_t length, int64_t stride, T2* output_data) {
   // Passing input_data as non-const is a concession to PyObject*
@@ -531,7 +547,7 @@ inline Status NumPyConverter::ConvertData(std::shared_ptr<Buffer>* data) {
   RETURN_NOT_OK(NumPyDtypeToArrow(reinterpret_cast<PyObject*>(dtype_), &input_type));
 
   if (!input_type->Equals(*type_)) {
-    RETURN_NOT_OK(CastBuffer(*data, length_, input_type, type_, pool_, data));
+    RETURN_NOT_OK(CastBuffer(input_type, *data, length_, nullptr, 0, type_, pool_, data));
   }
 
   return Status::OK();
@@ -567,27 +583,32 @@ inline Status NumPyConverter::ConvertData<Date32Type>(std::shared_ptr<Buffer>* d
     *data = std::make_shared<NumPyBuffer>(reinterpret_cast<PyObject*>(arr_));
   }
 
-  // If we have inbound datetime64[D] data, this needs to be downcasted
-  // separately here from int64_t to int32_t, because this data is not
-  // supported in compute::Cast
+  std::shared_ptr<DataType> input_type;
+
   auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(dtype_->c_metadata);
-  if (dtype_->type_num == NPY_DATETIME && date_dtype->meta.base == NPY_FR_D) {
-    auto date32_buffer = std::make_shared<PoolBuffer>(pool_);
-    RETURN_NOT_OK(date32_buffer->Resize(sizeof(int32_t) * length_));
+  if (dtype_->type_num == NPY_DATETIME) {
+    const int64_t null_count = ValuesToBitmap<NPY_DATETIME>(arr_, null_bitmap_data_);
 
-    auto datetime64_values = reinterpret_cast<const int64_t*>((*data)->data());
-    auto date32_values = reinterpret_cast<int32_t*>(date32_buffer->mutable_data());
-    for (int64_t i = 0; i < length_; ++i) {
+    // If we have inbound datetime64[D] data, this needs to be downcasted
+    // separately here from int64_t to int32_t, because this data is not
+    // supported in compute::Cast
+    if (date_dtype->meta.base == NPY_FR_D) {
       // TODO(wesm): How pedantic do we really want to be about checking for int32
       // overflow here?
-      *date32_values++ = static_cast<int32_t>(*datetime64_values++);
+      Status s = StaticCastBuffer<int64_t, int32_t>(**data, length_, pool_, data);
+      RETURN_NOT_OK(s);
+    } else {
+      RETURN_NOT_OK(NumPyDtypeToArrow(reinterpret_cast<PyObject*>(dtype_), &input_type));
+      if (!input_type->Equals(*type_)) {
+        RETURN_NOT_OK(CastBuffer(input_type, *data, length_, null_bitmap_, null_count,
+                                 type_, pool_, data));
+      }
     }
-    *data = date32_buffer;
   } else {
-    std::shared_ptr<DataType> input_type;
     RETURN_NOT_OK(NumPyDtypeToArrow(reinterpret_cast<PyObject*>(dtype_), &input_type));
     if (!input_type->Equals(*type_)) {
-      RETURN_NOT_OK(CastBuffer(*data, length_, input_type, type_, pool_, data));
+      RETURN_NOT_OK(
+          CastBuffer(input_type, *data, length_, nullptr, 0, type_, pool_, data));
     }
   }
 
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index d3145ff..2cff32f 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -63,6 +63,9 @@ class ARROW_EXPORT ChunkedArray {
   ArrayVector chunks_;
   int64_t length_;
   int64_t null_count_;
+
+ private:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(ChunkedArray);
 };
 
 /// \brief An immutable column data structure consisting of a field (type
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 9b875ce..77f489a 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -365,6 +365,10 @@ Status MakeArray(const std::vector<uint8_t>& valid_bytes, const std::vector<T>&
     }                                                                                  \
   } while (false)
 
+#define DECL_T() typedef typename TestFixture::T T;
+
+#define DECL_TYPE() typedef typename TestFixture::Type Type;
+
 void AssertArraysEqual(const Array& expected, const Array& actual) {
   ASSERT_ARRAYS_EQUAL(expected, actual);
 }
diff --git a/cpp/src/arrow/type-test.cc b/cpp/src/arrow/type-test.cc
index 3242fad..48982ca 100644
--- a/cpp/src/arrow/type-test.cc
+++ b/cpp/src/arrow/type-test.cc
@@ -400,7 +400,7 @@ TEST(TestStructType, Basics) {
 }
 
 TEST(TypesTest, TestDecimal128Small) {
-  DecimalType t1(8, 4);
+  Decimal128Type t1(8, 4);
 
   ASSERT_EQ(t1.id(), Type::DECIMAL);
   ASSERT_EQ(t1.precision(), 8);
@@ -414,7 +414,7 @@ TEST(TypesTest, TestDecimal128Small) {
 }
 
 TEST(TypesTest, TestDecimal128Medium) {
-  DecimalType t1(12, 5);
+  Decimal128Type t1(12, 5);
 
   ASSERT_EQ(t1.id(), Type::DECIMAL);
   ASSERT_EQ(t1.precision(), 12);
@@ -428,7 +428,7 @@ TEST(TypesTest, TestDecimal128Medium) {
 }
 
 TEST(TypesTest, TestDecimal128Large) {
-  DecimalType t1(27, 7);
+  Decimal128Type t1(27, 7);
 
   ASSERT_EQ(t1.id(), Type::DECIMAL);
   ASSERT_EQ(t1.precision(), 27);
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index d86e7ef..70f275c 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -498,25 +498,34 @@ class ARROW_EXPORT StructType : public NestedType {
   std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
-class ARROW_EXPORT Decimal128Type : public FixedSizeBinaryType {
+class ARROW_EXPORT DecimalBaseType : public FixedSizeBinaryType {
+ public:
+  explicit DecimalBaseType(int32_t byte_width, int32_t precision, int32_t scale)
+      : FixedSizeBinaryType(byte_width, Type::DECIMAL),
+        precision_(precision),
+        scale_(scale) {}
+
+  int32_t precision() const { return precision_; }
+  int32_t scale() const { return scale_; }
+
+ protected:
+  int32_t precision_;
+  int32_t scale_;
+};
+
+class ARROW_EXPORT Decimal128Type : public DecimalBaseType {
  public:
   static constexpr Type::type type_id = Type::DECIMAL;
 
   explicit Decimal128Type(int32_t precision, int32_t scale)
-      : FixedSizeBinaryType(16, Type::DECIMAL), precision_(precision), scale_(scale) {}
+      : DecimalBaseType(16, precision, scale) {}
 
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
   std::string name() const override { return "decimal"; }
-
-  int32_t precision() const { return precision_; }
-  int32_t scale() const { return scale_; }
-
- private:
-  int32_t precision_;
-  int32_t scale_;
 };
 
+// TODO(wesm): Remove this
 using DecimalType = Decimal128Type;
 
 struct UnionMode {
diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h
index b8b3c5a..9d8a23c 100644
--- a/cpp/src/arrow/type_fwd.h
+++ b/cpp/src/arrow/type_fwd.h
@@ -28,10 +28,16 @@ class Status;
 
 class DataType;
 class Array;
+struct ArrayData;
 class ArrayBuilder;
 class Field;
 class Tensor;
 
+class ChunkedArray;
+class Column;
+class RecordBatch;
+class Table;
+
 class Buffer;
 class MemoryPool;
 class RecordBatch;
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 6707f37..4bfce9b 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -430,6 +430,10 @@ static inline bool is_binary_like(Type::type type_id) {
   return false;
 }
 
+static inline bool is_dictionary(Type::type type_id) {
+  return type_id == Type::DICTIONARY;
+}
+
 }  // namespace arrow
 
 #endif  // ARROW_TYPE_TRAITS_H
diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt
index 7810a3b..29b18a9 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -42,6 +42,7 @@ install(FILES
   sse-util.h
   stl.h
   type_traits.h
+  variant.h
   visibility.h
   DESTINATION include/arrow/util)
 
@@ -72,3 +73,5 @@ ADD_ARROW_TEST(decimal-test)
 ADD_ARROW_TEST(key-value-metadata-test)
 ADD_ARROW_TEST(rle-encoding-test)
 ADD_ARROW_TEST(stl-util-test)
+
+add_subdirectory(variant)
diff --git a/cpp/src/arrow/util/variant.h b/cpp/src/arrow/util/variant.h
new file mode 100644
index 0000000..923a868
--- /dev/null
+++ b/cpp/src/arrow/util/variant.h
@@ -0,0 +1,1127 @@
+// Copyright (c) MapBox
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification,
+// are permitted provided that the following conditions are met:
+//
+// - Redistributions of source code must retain the above copyright notice, this
+//   list of conditions and the following disclaimer.
+// - Redistributions in binary form must reproduce the above copyright notice, this
+//   list of conditions and the following disclaimer in the documentation and/or
+//   other materials provided with the distribution.
+// - Neither the name "MapBox" nor the names of its contributors may be
+//   used to endorse or promote products derived from this software without
+//   specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef ARROW_UTIL_VARIANT_H
+#define ARROW_UTIL_VARIANT_H
+
+#include <cassert>
+#include <cstddef>   // size_t
+#include <new>       // operator new
+#include <stdexcept> // runtime_error
+#include <string>
+#include <tuple>
+#include <type_traits>
+#include <typeinfo>
+#include <utility>
+#include <functional>
+#include <limits>
+
+#include <arrow/util/variant/recursive_wrapper.h>
+#include <arrow/util/variant/variant_visitor.h>
+
+// clang-format off
+// [[deprecated]] is only available in C++14, use this for the time being
+#if __cplusplus <= 201103L
+# ifdef __GNUC__
+#  define ARROW_VARIANT_DEPRECATED __attribute__((deprecated))
+# elif defined(_MSC_VER)
+#  define ARROW_VARIANT_DEPRECATED __declspec(deprecated)
+# else
+#  define ARROW_VARIANT_DEPRECATED
+# endif
+#else
+#  define ARROW_VARIANT_DEPRECATED [[deprecated]]
+#endif
+
+
+#ifdef _MSC_VER
+// https://msdn.microsoft.com/en-us/library/bw1hbe6y.aspx
+# ifdef NDEBUG
+#  define VARIANT_INLINE __forceinline
+# else
+#  define VARIANT_INLINE //__declspec(noinline)
+# endif
+#else
+# ifdef NDEBUG
+#  define VARIANT_INLINE //inline __attribute__((always_inline))
+# else
+#  define VARIANT_INLINE __attribute__((noinline))
+# endif
+#endif
+// clang-format on
+
+// Exceptions
+#if defined( __EXCEPTIONS) || defined( _MSC_VER)
+#define HAS_EXCEPTIONS
+#endif
+
+#define VARIANT_MAJOR_VERSION 1
+#define VARIANT_MINOR_VERSION 1
+#define VARIANT_PATCH_VERSION 0
+
+#define VARIANT_VERSION (VARIANT_MAJOR_VERSION * 100000) + (VARIANT_MINOR_VERSION * 100) + (VARIANT_PATCH_VERSION)
+
+namespace arrow {
+namespace util {
+
+// XXX This should derive from std::logic_error instead of std::runtime_error.
+//     See https://github.com/mapbox/variant/issues/48 for details.
+class bad_variant_access : public std::runtime_error
+{
+
+public:
+    explicit bad_variant_access(const std::string& what_arg)
+        : runtime_error(what_arg) {}
+
+    explicit bad_variant_access(const char* what_arg)
+        : runtime_error(what_arg) {}
+
+}; // class bad_variant_access
+
+template <typename R = void>
+struct ARROW_VARIANT_DEPRECATED static_visitor
+{
+    using result_type = R;
+
+protected:
+    static_visitor() {}
+    ~static_visitor() {}
+};
+
+#if !defined(ARROW_VARIANT_MINIMIZE_SIZE)
+using type_index_t = unsigned int;
+#else
+#if defined(ARROW_VARIANT_OPTIMIZE_FOR_SPEED)
+using type_index_t = std::uint_fast8_t;
+#else
+using type_index_t = std::uint_least8_t;
+#endif
+#endif
+
+namespace detail {
+
+static constexpr type_index_t invalid_value = type_index_t(-1);
+
+template <typename T, typename... Types>
+struct direct_type;
+
+template <typename T, typename First, typename... Types>
+struct direct_type<T, First, Types...>
+{
+    static constexpr type_index_t index = std::is_same<T, First>::value
+        ? sizeof...(Types)
+        : direct_type<T, Types...>::index;
+};
+
+template <typename T>
+struct direct_type<T>
+{
+    static constexpr type_index_t index = invalid_value;
+};
+
+#if __cpp_lib_logical_traits >= 201510L
+
+using std::conjunction;
+using std::disjunction;
+
+#else
+
+template <typename...>
+struct conjunction : std::true_type {};
+
+template <typename B1>
+struct conjunction<B1> : B1 {};
+
+template <typename B1, typename B2>
+struct conjunction<B1, B2> : std::conditional<B1::value, B2, B1>::type {};
+
+template <typename B1, typename... Bs>
+struct conjunction<B1, Bs...> : std::conditional<B1::value, conjunction<Bs...>, B1>::type {};
+
+template <typename...>
+struct disjunction : std::false_type {};
+
+template <typename B1>
+struct disjunction<B1> : B1 {};
+
+template <typename B1, typename B2>
+struct disjunction<B1, B2> : std::conditional<B1::value, B1, B2>::type {};
+
+template <typename B1, typename... Bs>
+struct disjunction<B1, Bs...> : std::conditional<B1::value, B1, disjunction<Bs...>>::type {};
+
+#endif
+
+template <typename T, typename... Types>
+struct convertible_type;
+
+template <typename T, typename First, typename... Types>
+struct convertible_type<T, First, Types...>
+{
+    static constexpr type_index_t index = std::is_convertible<T, First>::value
+        ? disjunction<std::is_convertible<T, Types>...>::value ? invalid_value : sizeof...(Types)
+        : convertible_type<T, Types...>::index;
+};
+
+template <typename T>
+struct convertible_type<T>
+{
+    static constexpr type_index_t index = invalid_value;
+};
+
+template <typename T, typename... Types>
+struct value_traits
+{
+    using value_type = typename std::remove_const<typename std::remove_reference<T>::type>::type;
+    using value_type_wrapper = recursive_wrapper<value_type>;
+    static constexpr type_index_t direct_index = direct_type<value_type, Types...>::index;
+    static constexpr bool is_direct = direct_index != invalid_value;
+    static constexpr type_index_t index_direct_or_wrapper = is_direct ? direct_index : direct_type<value_type_wrapper, Types...>::index;
+    static constexpr bool is_direct_or_wrapper = index_direct_or_wrapper != invalid_value;
+    static constexpr type_index_t index = is_direct_or_wrapper ? index_direct_or_wrapper : convertible_type<value_type, Types...>::index;
+    static constexpr bool is_valid = index != invalid_value;
+    static constexpr type_index_t tindex = is_valid ? sizeof...(Types)-index : 0;
+    using target_type = typename std::tuple_element<tindex, std::tuple<void, Types...>>::type;
+};
+
+template <typename T, typename R = void>
+struct enable_if_type
+{
+    using type = R;
+};
+
+template <typename F, typename V, typename Enable = void>
+struct result_of_unary_visit
+{
+    using type = typename std::result_of<F(V&)>::type;
+};
+
+template <typename F, typename V>
+struct result_of_unary_visit<F, V, typename enable_if_type<typename F::result_type>::type>
+{
+    using type = typename F::result_type;
+};
+
+template <typename F, typename V, typename Enable = void>
+struct result_of_binary_visit
+{
+    using type = typename std::result_of<F(V&, V&)>::type;
+};
+
+template <typename F, typename V>
+struct result_of_binary_visit<F, V, typename enable_if_type<typename F::result_type>::type>
+{
+    using type = typename F::result_type;
+};
+
+template <type_index_t arg1, type_index_t... others>
+struct static_max;
+
+template <type_index_t arg>
+struct static_max<arg>
+{
+    static const type_index_t value = arg;
+};
+
+template <type_index_t arg1, type_index_t arg2, type_index_t... others>
+struct static_max<arg1, arg2, others...>
+{
+    static const type_index_t value = arg1 >= arg2 ? static_max<arg1, others...>::value : static_max<arg2, others...>::value;
+};
+
+template <typename... Types>
+struct variant_helper;
+
+template <typename T, typename... Types>
+struct variant_helper<T, Types...>
+{
+    VARIANT_INLINE static void destroy(const type_index_t type_index, void* data)
+    {
+        if (type_index == sizeof...(Types))
+        {
+            reinterpret_cast<T*>(data)->~T();
+        }
+        else
+        {
+            variant_helper<Types...>::destroy(type_index, data);
+        }
+    }
+
+    VARIANT_INLINE static void move(const type_index_t old_type_index, void* old_value, void* new_value)
+    {
+        if (old_type_index == sizeof...(Types))
+        {
+            new (new_value) T(std::move(*reinterpret_cast<T*>(old_value)));
+        }
+        else
+        {
+            variant_helper<Types...>::move(old_type_index, old_value, new_value);
+        }
+    }
+
+    VARIANT_INLINE static void copy(const type_index_t old_type_index, const void* old_value, void* new_value)
+    {
+        if (old_type_index == sizeof...(Types))
+        {
+            new (new_value) T(*reinterpret_cast<const T*>(old_value));
+        }
+        else
+        {
+            variant_helper<Types...>::copy(old_type_index, old_value, new_value);
+        }
+    }
+};
+
+template <>
+struct variant_helper<>
+{
+    VARIANT_INLINE static void destroy(const type_index_t, void*) {}
+    VARIANT_INLINE static void move(const type_index_t, void*, void*) {}
+    VARIANT_INLINE static void copy(const type_index_t, const void*, void*) {}
+};
+
+template <typename T>
+struct unwrapper
+{
+    static T const& apply_const(T const& obj) { return obj; }
+    static T& apply(T& obj) { return obj; }
+};
+
+template <typename T>
+struct unwrapper<recursive_wrapper<T>>
+{
+    static auto apply_const(recursive_wrapper<T> const& obj)
+        -> typename recursive_wrapper<T>::type const&
+    {
+        return obj.get();
+    }
+    static auto apply(recursive_wrapper<T>& obj)
+        -> typename recursive_wrapper<T>::type&
+    {
+        return obj.get();
+    }
+};
+
+template <typename T>
+struct unwrapper<std::reference_wrapper<T>>
+{
+    static auto apply_const(std::reference_wrapper<T> const& obj)
+        -> typename std::reference_wrapper<T>::type const&
+    {
+        return obj.get();
+    }
+    static auto apply(std::reference_wrapper<T>& obj)
+        -> typename std::reference_wrapper<T>::type&
+    {
+        return obj.get();
+    }
+};
+
+template <typename F, typename V, typename R, typename... Types>
+struct dispatcher;
+
+template <typename F, typename V, typename R, typename T, typename... Types>
+struct dispatcher<F, V, R, T, Types...>
+{
+    VARIANT_INLINE static R apply_const(V const& v, F&& f)
+    {
+        if (v.template is<T>())
+        {
+            return f(unwrapper<T>::apply_const(v.template get_unchecked<T>()));
+        }
+        else
+        {
+            return dispatcher<F, V, R, Types...>::apply_const(v, std::forward<F>(f));
+        }
+    }
+
+    VARIANT_INLINE static R apply(V& v, F&& f)
+    {
+        if (v.template is<T>())
+        {
+            return f(unwrapper<T>::apply(v.template get_unchecked<T>()));
+        }
+        else
+        {
+            return dispatcher<F, V, R, Types...>::apply(v, std::forward<F>(f));
+        }
+    }
+};
+
+template <typename F, typename V, typename R, typename T>
+struct dispatcher<F, V, R, T>
+{
+    VARIANT_INLINE static R apply_const(V const& v, F&& f)
+    {
+        return f(unwrapper<T>::apply_const(v.template get_unchecked<T>()));
+    }
+
+    VARIANT_INLINE static R apply(V& v, F&& f)
+    {
+        return f(unwrapper<T>::apply(v.template get_unchecked<T>()));
+    }
+};
+
+template <typename F, typename V, typename R, typename T, typename... Types>
+struct binary_dispatcher_rhs;
+
+template <typename F, typename V, typename R, typename T0, typename T1, typename... Types>
+struct binary_dispatcher_rhs<F, V, R, T0, T1, Types...>
+{
+    VARIANT_INLINE static R apply_const(V const& lhs, V const& rhs, F&& f)
+    {
+        if (rhs.template is<T1>()) // call binary functor
+        {
+            return f(unwrapper<T0>::apply_const(lhs.template get_unchecked<T0>()),
+                     unwrapper<T1>::apply_const(rhs.template get_unchecked<T1>()));
+        }
+        else
+        {
+            return binary_dispatcher_rhs<F, V, R, T0, Types...>::apply_const(lhs, rhs, std::forward<F>(f));
+        }
+    }
+
+    VARIANT_INLINE static R apply(V& lhs, V& rhs, F&& f)
+    {
+        if (rhs.template is<T1>()) // call binary functor
+        {
+            return f(unwrapper<T0>::apply(lhs.template get_unchecked<T0>()),
+                     unwrapper<T1>::apply(rhs.template get_unchecked<T1>()));
+        }
+        else
+        {
+            return binary_dispatcher_rhs<F, V, R, T0, Types...>::apply(lhs, rhs, std::forward<F>(f));
+        }
+    }
+};
+
+template <typename F, typename V, typename R, typename T0, typename T1>
+struct binary_dispatcher_rhs<F, V, R, T0, T1>
+{
+    VARIANT_INLINE static R apply_const(V const& lhs, V const& rhs, F&& f)
+    {
+        return f(unwrapper<T0>::apply_const(lhs.template get_unchecked<T0>()),
+                 unwrapper<T1>::apply_const(rhs.template get_unchecked<T1>()));
+    }
+
+    VARIANT_INLINE static R apply(V& lhs, V& rhs, F&& f)
+    {
+        return f(unwrapper<T0>::apply(lhs.template get_unchecked<T0>()),
+                 unwrapper<T1>::apply(rhs.template get_unchecked<T1>()));
+    }
+};
+
+template <typename F, typename V, typename R, typename T, typename... Types>
+struct binary_dispatcher_lhs;
+
+template <typename F, typename V, typename R, typename T0, typename T1, typename... Types>
+struct binary_dispatcher_lhs<F, V, R, T0, T1, Types...>
+{
+    VARIANT_INLINE static R apply_const(V const& lhs, V const& rhs, F&& f)
+    {
+        if (lhs.template is<T1>()) // call binary functor
+        {
+            return f(unwrapper<T1>::apply_const(lhs.template get_unchecked<T1>()),
+                     unwrapper<T0>::apply_const(rhs.template get_unchecked<T0>()));
+        }
+        else
+        {
+            return binary_dispatcher_lhs<F, V, R, T0, Types...>::apply_const(lhs, rhs, std::forward<F>(f));
+        }
+    }
+
+    VARIANT_INLINE static R apply(V& lhs, V& rhs, F&& f)
+    {
+        if (lhs.template is<T1>()) // call binary functor
+        {
+            return f(unwrapper<T1>::apply(lhs.template get_unchecked<T1>()),
+                     unwrapper<T0>::apply(rhs.template get_unchecked<T0>()));
+        }
+        else
+        {
+            return binary_dispatcher_lhs<F, V, R, T0, Types...>::apply(lhs, rhs, std::forward<F>(f));
+        }
+    }
+};
+
+template <typename F, typename V, typename R, typename T0, typename T1>
+struct binary_dispatcher_lhs<F, V, R, T0, T1>
+{
+    VARIANT_INLINE static R apply_const(V const& lhs, V const& rhs, F&& f)
+    {
+        return f(unwrapper<T1>::apply_const(lhs.template get_unchecked<T1>()),
+                 unwrapper<T0>::apply_const(rhs.template get_unchecked<T0>()));
+    }
+
+    VARIANT_INLINE static R apply(V& lhs, V& rhs, F&& f)
+    {
+        return f(unwrapper<T1>::apply(lhs.template get_unchecked<T1>()),
+                 unwrapper<T0>::apply(rhs.template get_unchecked<T0>()));
+    }
+};
+
+template <typename F, typename V, typename R, typename... Types>
+struct binary_dispatcher;
+
+template <typename F, typename V, typename R, typename T, typename... Types>
+struct binary_dispatcher<F, V, R, T, Types...>
+{
+    VARIANT_INLINE static R apply_const(V const& v0, V const& v1, F&& f)
+    {
+        if (v0.template is<T>())
+        {
+            if (v1.template is<T>())
+            {
+                return f(unwrapper<T>::apply_const(v0.template get_unchecked<T>()),
+                         unwrapper<T>::apply_const(v1.template get_unchecked<T>())); // call binary functor
+            }
+            else
+            {
+                return binary_dispatcher_rhs<F, V, R, T, Types...>::apply_const(v0, v1, std::forward<F>(f));
+            }
+        }
+        else if (v1.template is<T>())
+        {
+            return binary_dispatcher_lhs<F, V, R, T, Types...>::apply_const(v0, v1, std::forward<F>(f));
+        }
+        return binary_dispatcher<F, V, R, Types...>::apply_const(v0, v1, std::forward<F>(f));
+    }
+
+    VARIANT_INLINE static R apply(V& v0, V& v1, F&& f)
+    {
+        if (v0.template is<T>())
+        {
+            if (v1.template is<T>())
+            {
+                return f(unwrapper<T>::apply(v0.template get_unchecked<T>()),
+                         unwrapper<T>::apply(v1.template get_unchecked<T>())); // call binary functor
+            }
+            else
+            {
+                return binary_dispatcher_rhs<F, V, R, T, Types...>::apply(v0, v1, std::forward<F>(f));
+            }
+        }
+        else if (v1.template is<T>())
+        {
+            return binary_dispatcher_lhs<F, V, R, T, Types...>::apply(v0, v1, std::forward<F>(f));
+        }
+        return binary_dispatcher<F, V, R, Types...>::apply(v0, v1, std::forward<F>(f));
+    }
+};
+
+template <typename F, typename V, typename R, typename T>
+struct binary_dispatcher<F, V, R, T>
+{
+    VARIANT_INLINE static R apply_const(V const& v0, V const& v1, F&& f)
+    {
+        return f(unwrapper<T>::apply_const(v0.template get_unchecked<T>()),
+                 unwrapper<T>::apply_const(v1.template get_unchecked<T>())); // call binary functor
+    }
+
+    VARIANT_INLINE static R apply(V& v0, V& v1, F&& f)
+    {
+        return f(unwrapper<T>::apply(v0.template get_unchecked<T>()),
+                 unwrapper<T>::apply(v1.template get_unchecked<T>())); // call binary functor
+    }
+};
+
+// comparator functors
+struct equal_comp
+{
+    template <typename T>
+    bool operator()(T const& lhs, T const& rhs) const
+    {
+        return lhs == rhs;
+    }
+};
+
+struct less_comp
+{
+    template <typename T>
+    bool operator()(T const& lhs, T const& rhs) const
+    {
+        return lhs < rhs;
+    }
+};
+
+template <typename Variant, typename Comp>
+class comparer
+{
+public:
+    explicit comparer(Variant const& lhs) noexcept
+        : lhs_(lhs) {}
+    comparer& operator=(comparer const&) = delete;
+    // visitor
+    template <typename T>
+    bool operator()(T const& rhs_content) const
+    {
+        T const& lhs_content = lhs_.template get_unchecked<T>();
+        return Comp()(lhs_content, rhs_content);
+    }
+
+private:
+    Variant const& lhs_;
+};
+
+// hashing visitor
+struct hasher
+{
+    template <typename T>
+    std::size_t operator()(const T& hashable) const
+    {
+        return std::hash<T>{}(hashable);
+    }
+};
+
+} // namespace detail
+
+struct no_init {};
+
+template <typename... Types>
+class variant
+{
+    static_assert(sizeof...(Types) > 0, "Template parameter type list of variant can not be empty.");
+    static_assert(!detail::disjunction<std::is_reference<Types>...>::value, "Variant can not hold reference types. Maybe use std::reference_wrapper?");
+    static_assert(!detail::disjunction<std::is_array<Types>...>::value, "Variant can not hold array types.");
+    static_assert(sizeof...(Types) < std::numeric_limits<type_index_t>::max(), "Internal index type must be able to accommodate all alternatives.");
+private:
+    static const std::size_t data_size = detail::static_max<sizeof(Types)...>::value;
+    static const std::size_t data_align = detail::static_max<alignof(Types)...>::value;
+public:
+    struct adapted_variant_tag;
+    using types = std::tuple<Types...>;
+private:
+    using first_type = typename std::tuple_element<0, types>::type;
+    using data_type = typename std::aligned_storage<data_size, data_align>::type;
+    using helper_type = detail::variant_helper<Types...>;
+
+    type_index_t type_index;
+    data_type data;
+
+public:
+    VARIANT_INLINE variant() noexcept(std::is_nothrow_default_constructible<first_type>::value)
+        : type_index(sizeof...(Types)-1)
+    {
+        static_assert(std::is_default_constructible<first_type>::value, "First type in variant must be default constructible to allow default construction of variant.");
+        new (&data) first_type();
+    }
+
+    VARIANT_INLINE variant(no_init) noexcept
+        : type_index(detail::invalid_value) {}
+
+    // http://isocpp.org/blog/2012/11/universal-references-in-c11-scott-meyers
+    template <typename T, typename Traits = detail::value_traits<T, Types...>,
+              typename Enable = typename std::enable_if<Traits::is_valid && !std::is_same<variant<Types...>, typename Traits::value_type>::value>::type >
+    VARIANT_INLINE variant(T&& val) noexcept(std::is_nothrow_constructible<typename Traits::target_type, T&&>::value)
+        : type_index(Traits::index)
+    {
+        new (&data) typename Traits::target_type(std::forward<T>(val));
+    }
+
+    VARIANT_INLINE variant(variant<Types...> const& old)
+        : type_index(old.type_index)
+    {
+        helper_type::copy(old.type_index, &old.data, &data);
+    }
+
+    VARIANT_INLINE variant(variant<Types...>&& old)
+        noexcept(detail::conjunction<std::is_nothrow_move_constructible<Types>...>::value)
+        : type_index(old.type_index)
+    {
+        helper_type::move(old.type_index, &old.data, &data);
+    }
+
+private:
+    VARIANT_INLINE void copy_assign(variant<Types...> const& rhs)
+    {
+        helper_type::destroy(type_index, &data);
+        type_index = detail::invalid_value;
+        helper_type::copy(rhs.type_index, &rhs.data, &data);
+        type_index = rhs.type_index;
+    }
+
+    VARIANT_INLINE void move_assign(variant<Types...>&& rhs)
+    {
+        helper_type::destroy(type_index, &data);
+        type_index = detail::invalid_value;
+        helper_type::move(rhs.type_index, &rhs.data, &data);
+        type_index = rhs.type_index;
+    }
+
+public:
+    VARIANT_INLINE variant<Types...>& operator=(variant<Types...>&& other)
+    {
+        move_assign(std::move(other));
+        return *this;
+    }
+
+    VARIANT_INLINE variant<Types...>& operator=(variant<Types...> const& other)
+    {
+        copy_assign(other);
+        return *this;
+    }
+
+    // conversions
+    // move-assign
+    template <typename T>
+    VARIANT_INLINE variant<Types...>& operator=(T&& rhs) noexcept
+    {
+        variant<Types...> temp(std::forward<T>(rhs));
+        move_assign(std::move(temp));
+        return *this;
+    }
+
+    // copy-assign
+    template <typename T>
+    VARIANT_INLINE variant<Types...>& operator=(T const& rhs)
+    {
+        variant<Types...> temp(rhs);
+        copy_assign(temp);
+        return *this;
+    }
+
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<T, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE bool is() const
+    {
+        return type_index == detail::direct_type<T, Types...>::index;
+    }
+
+    template <typename T,typename std::enable_if<
+                         (detail::direct_type<recursive_wrapper<T>, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE bool is() const
+    {
+        return type_index == detail::direct_type<recursive_wrapper<T>, Types...>::index;
+    }
+
+    VARIANT_INLINE bool valid() const
+    {
+        return type_index != detail::invalid_value;
+    }
+
+    template <typename T, typename... Args>
+    VARIANT_INLINE void set(Args&&... args)
+    {
+        helper_type::destroy(type_index, &data);
+        type_index = detail::invalid_value;
+        new (&data) T(std::forward<Args>(args)...);
+        type_index = detail::direct_type<T, Types...>::index;
+    }
+
+    // get_unchecked<T>()
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<T, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T& get_unchecked()
+    {
+        return *reinterpret_cast<T*>(&data);
+    }
+
+#ifdef HAS_EXCEPTIONS
+    // get<T>()
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<T, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T& get()
+    {
+        if (type_index == detail::direct_type<T, Types...>::index)
+        {
+            return *reinterpret_cast<T*>(&data);
+        }
+        else
+        {
+            throw bad_variant_access("in get<T>()");
+        }
+    }
+#endif
+
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<T, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T const& get_unchecked() const
+    {
+        return *reinterpret_cast<T const*>(&data);
+    }
+
+#ifdef HAS_EXCEPTIONS
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<T, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T const& get() const
+    {
+        if (type_index == detail::direct_type<T, Types...>::index)
+        {
+            return *reinterpret_cast<T const*>(&data);
+        }
+        else
+        {
+            throw bad_variant_access("in get<T>()");
+        }
+    }
+#endif
+
+    // get_unchecked<T>() - T stored as recursive_wrapper<T>
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<recursive_wrapper<T>, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T& get_unchecked()
+    {
+        return (*reinterpret_cast<recursive_wrapper<T>*>(&data)).get();
+    }
+
+#ifdef HAS_EXCEPTIONS
+    // get<T>() - T stored as recursive_wrapper<T>
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<recursive_wrapper<T>, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T& get()
+    {
+        if (type_index == detail::direct_type<recursive_wrapper<T>, Types...>::index)
+        {
+            return (*reinterpret_cast<recursive_wrapper<T>*>(&data)).get();
+        }
+        else
+        {
+            throw bad_variant_access("in get<T>()");
+        }
+    }
+#endif
+
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<recursive_wrapper<T>, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T const& get_unchecked() const
+    {
+        return (*reinterpret_cast<recursive_wrapper<T> const*>(&data)).get();
+    }
+
+#ifdef HAS_EXCEPTIONS
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<recursive_wrapper<T>, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T const& get() const
+    {
+        if (type_index == detail::direct_type<recursive_wrapper<T>, Types...>::index)
+        {
+            return (*reinterpret_cast<recursive_wrapper<T> const*>(&data)).get();
+        }
+        else
+        {
+            throw bad_variant_access("in get<T>()");
+        }
+    }
+#endif
+
+    // get_unchecked<T>() - T stored as std::reference_wrapper<T>
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<std::reference_wrapper<T>, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T& get_unchecked()
+    {
+        return (*reinterpret_cast<std::reference_wrapper<T>*>(&data)).get();
+    }
+
+#ifdef HAS_EXCEPTIONS
+    // get<T>() - T stored as std::reference_wrapper<T>
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<std::reference_wrapper<T>, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T& get()
+    {
+        if (type_index == detail::direct_type<std::reference_wrapper<T>, Types...>::index)
+        {
+            return (*reinterpret_cast<std::reference_wrapper<T>*>(&data)).get();
+        }
+        else
+        {
+            throw bad_variant_access("in get<T>()");
+        }
+    }
+#endif
+
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<std::reference_wrapper<T const>, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T const& get_unchecked() const
+    {
+        return (*reinterpret_cast<std::reference_wrapper<T const> const*>(&data)).get();
+    }
+
+#ifdef HAS_EXCEPTIONS
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<std::reference_wrapper<T const>, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE T const& get() const
+    {
+        if (type_index == detail::direct_type<std::reference_wrapper<T const>, Types...>::index)
+        {
+            return (*reinterpret_cast<std::reference_wrapper<T const> const*>(&data)).get();
+        }
+        else
+        {
+            throw bad_variant_access("in get<T>()");
+        }
+    }
+#endif
+
+    // This function is deprecated because it returns an internal index field.
+    // Use which() instead.
+    ARROW_VARIANT_DEPRECATED VARIANT_INLINE type_index_t get_type_index() const
+    {
+        return type_index;
+    }
+
+    VARIANT_INLINE int which() const noexcept
+    {
+        return static_cast<int>(sizeof...(Types) - type_index - 1);
+    }
+
+    template <typename T, typename std::enable_if<
+                          (detail::direct_type<T, Types...>::index != detail::invalid_value)>::type* = nullptr>
+    VARIANT_INLINE static constexpr int which() noexcept
+    {
+        return static_cast<int>(sizeof...(Types)-detail::direct_type<T, Types...>::index - 1);
+    }
+
+    // visitor
+    // unary
+    template <typename F, typename V, typename R = typename detail::result_of_unary_visit<F, first_type>::type>
+    auto VARIANT_INLINE static visit(V const& v, F&& f)
+        -> decltype(detail::dispatcher<F, V, R, Types...>::apply_const(v, std::forward<F>(f)))
+    {
+        return detail::dispatcher<F, V, R, Types...>::apply_const(v, std::forward<F>(f));
+    }
+    // non-const
+    template <typename F, typename V, typename R = typename detail::result_of_unary_visit<F, first_type>::type>
+    auto VARIANT_INLINE static visit(V& v, F&& f)
+        -> decltype(detail::dispatcher<F, V, R, Types...>::apply(v, std::forward<F>(f)))
+    {
+        return detail::dispatcher<F, V, R, Types...>::apply(v, std::forward<F>(f));
+    }
+
+    // binary
+    // const
+    template <typename F, typename V, typename R = typename detail::result_of_binary_visit<F, first_type>::type>
+    auto VARIANT_INLINE static binary_visit(V const& v0, V const& v1, F&& f)
+        -> decltype(detail::binary_dispatcher<F, V, R, Types...>::apply_const(v0, v1, std::forward<F>(f)))
+    {
+        return detail::binary_dispatcher<F, V, R, Types...>::apply_const(v0, v1, std::forward<F>(f));
+    }
+    // non-const
+    template <typename F, typename V, typename R = typename detail::result_of_binary_visit<F, first_type>::type>
+    auto VARIANT_INLINE static binary_visit(V& v0, V& v1, F&& f)
+        -> decltype(detail::binary_dispatcher<F, V, R, Types...>::apply(v0, v1, std::forward<F>(f)))
+    {
+        return detail::binary_dispatcher<F, V, R, Types...>::apply(v0, v1, std::forward<F>(f));
+    }
+
+    // match
+    // unary
+    template <typename... Fs>
+    auto VARIANT_INLINE match(Fs&&... fs) const
+        -> decltype(variant::visit(*this, ::arrow::util::make_visitor(std::forward<Fs>(fs)...)))
+    {
+        return variant::visit(*this, ::arrow::util::make_visitor(std::forward<Fs>(fs)...));
+    }
+    // non-const
+    template <typename... Fs>
+    auto VARIANT_INLINE match(Fs&&... fs)
+        -> decltype(variant::visit(*this, ::arrow::util::make_visitor(std::forward<Fs>(fs)...)))
+    {
+        return variant::visit(*this, ::arrow::util::make_visitor(std::forward<Fs>(fs)...));
+    }
+
+    ~variant() noexcept // no-throw destructor
+    {
+        helper_type::destroy(type_index, &data);
+    }
+
+    // comparison operators
+    // equality
+    VARIANT_INLINE bool operator==(variant const& rhs) const
+    {
+        assert(valid() && rhs.valid());
+        if (this->which() != rhs.which())
+        {
+            return false;
+        }
+        detail::comparer<variant, detail::equal_comp> visitor(*this);
+        return visit(rhs, visitor);
+    }
+
+    VARIANT_INLINE bool operator!=(variant const& rhs) const
+    {
+        return !(*this == rhs);
+    }
+
+    // less than
+    VARIANT_INLINE bool operator<(variant const& rhs) const
+    {
+        assert(valid() && rhs.valid());
+        if (this->which() != rhs.which())
+        {
+            return this->which() < rhs.which();
+        }
+        detail::comparer<variant, detail::less_comp> visitor(*this);
+        return visit(rhs, visitor);
+    }
+    VARIANT_INLINE bool operator>(variant const& rhs) const
+    {
+        return rhs < *this;
+    }
+    VARIANT_INLINE bool operator<=(variant const& rhs) const
+    {
+        return !(*this > rhs);
+    }
+    VARIANT_INLINE bool operator>=(variant const& rhs) const
+    {
+        return !(*this < rhs);
+    }
+};
+
+// unary visitor interface
+// const
+template <typename F, typename V>
+auto VARIANT_INLINE apply_visitor(F&& f, V const& v) -> decltype(V::visit(v, std::forward<F>(f)))
+{
+    return V::visit(v, std::forward<F>(f));
+}
+
+// non-const
+template <typename F, typename V>
+auto VARIANT_INLINE apply_visitor(F&& f, V& v) -> decltype(V::visit(v, std::forward<F>(f)))
+{
+    return V::visit(v, std::forward<F>(f));
+}
+
+// binary visitor interface
+// const
+template <typename F, typename V>
+auto VARIANT_INLINE apply_visitor(F&& f, V const& v0, V const& v1) -> decltype(V::binary_visit(v0, v1, std::forward<F>(f)))
+{
+    return V::binary_visit(v0, v1, std::forward<F>(f));
+}
+
+// non-const
+template <typename F, typename V>
+auto VARIANT_INLINE apply_visitor(F&& f, V& v0, V& v1) -> decltype(V::binary_visit(v0, v1, std::forward<F>(f)))
+{
+    return V::binary_visit(v0, v1, std::forward<F>(f));
+}
+
+// getter interface
+
+#ifdef HAS_EXCEPTIONS
+template <typename ResultType, typename T>
+auto get(T& var)->decltype(var.template get<ResultType>())
+{
+    return var.template get<ResultType>();
+}
+#endif
+
+template <typename ResultType, typename T>
+ResultType& get_unchecked(T& var)
+{
+    return var.template get_unchecked<ResultType>();
+}
+
+#ifdef HAS_EXCEPTIONS
+template <typename ResultType, typename T>
+auto get(T const& var)->decltype(var.template get<ResultType>())
+{
+    return var.template get<ResultType>();
+}
+#endif
+
+template <typename ResultType, typename T>
+ResultType const& get_unchecked(T const& var)
+{
+    return var.template get_unchecked<ResultType>();
+}
+// variant_size
+template <typename T>
+struct variant_size;
+
+//variable templates is c++14
+//template <typename T>
+//constexpr std::size_t variant_size_v = variant_size<T>::value;
+
+template <typename T>
+struct variant_size<const T>
+    : variant_size<T> {};
+
+template <typename T>
+struct variant_size<volatile T>
+    : variant_size<T> {};
+
+template <typename T>
+struct variant_size<const volatile T>
+    : variant_size<T> {};
+
+template <typename... Types>
+struct variant_size<variant<Types...>>
+    : std::integral_constant<std::size_t, sizeof...(Types)> {};
+
+// variant_alternative
+template <std::size_t Index, typename T>
+struct variant_alternative;
+
+#if defined(__clang__)
+#if __has_builtin(__type_pack_element)
+#define has_type_pack_element
+#endif
+#endif
+
+#if defined(has_type_pack_element)
+template <std::size_t Index, typename ...Types>
+struct variant_alternative<Index, variant<Types...>>
+{
+    static_assert(sizeof...(Types) > Index , "Index out of range");
+    using type = __type_pack_element<Index, Types...>;
+};
+#else
+template <std::size_t Index, typename First, typename...Types>
+struct variant_alternative<Index, variant<First, Types...>>
+    : variant_alternative<Index - 1, variant<Types...>>
+{
+    static_assert(sizeof...(Types) > Index -1 , "Index out of range");
+};
+
+template <typename First, typename...Types>
+struct variant_alternative<0, variant<First, Types...>>
+{
+    using type = First;
+};
+
+#endif
+
+template <size_t Index, typename T>
+using variant_alternative_t = typename variant_alternative<Index, T>::type;
+
+template <size_t Index, typename T>
+struct variant_alternative<Index, const T>
+    : std::add_const<variant_alternative<Index, T>> {};
+
+template <size_t Index, typename T>
+struct variant_alternative<Index, volatile T>
+    : std::add_volatile<variant_alternative<Index, T>> {};
+
+template <size_t Index, typename T>
+struct variant_alternative<Index, const volatile T>
+    : std::add_cv<variant_alternative<Index, T>> {};
+
+} // namespace util
+} // namespace arrow
+
+#endif // ARROW_UTIL_VARIANT_H
diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/util/variant/CMakeLists.txt
similarity index 69%
copy from cpp/src/arrow/compute/CMakeLists.txt
copy to cpp/src/arrow/util/variant/CMakeLists.txt
index 4589afb..0ebb251 100644
--- a/cpp/src/arrow/compute/CMakeLists.txt
+++ b/cpp/src/arrow/util/variant/CMakeLists.txt
@@ -15,24 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Headers: top level
-install(FILES
-  api.h
-  cast.h
-  context.h
-  kernel.h
-  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/compute")
-
-# pkg-config support
-configure_file(arrow-compute.pc.in
-  "${CMAKE_CURRENT_BINARY_DIR}/arrow-compute.pc"
-  @ONLY)
-install(
-  FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-compute.pc"
-  DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
-
 #######################################
-# Unit tests
+# arrow_util_variant
 #######################################
 
-ADD_ARROW_TEST(compute-test)
+install(FILES
+  optional.h
+  recursive_wrapper.h
+  variant_cast.h
+  variant_io.h
+  variant_visitor.h
+  DESTINATION include/arrow/util/variant)
diff --git a/cpp/src/arrow/util/variant/optional.h b/cpp/src/arrow/util/variant/optional.h
new file mode 100644
index 0000000..4c66710
--- /dev/null
+++ b/cpp/src/arrow/util/variant/optional.h
@@ -0,0 +1,100 @@
+// Copyright (c) MapBox
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification,
+// are permitted provided that the following conditions are met:
+//
+// - Redistributions of source code must retain the above copyright notice, this
+//   list of conditions and the following disclaimer.
+// - Redistributions in binary form must reproduce the above copyright notice, this
+//   list of conditions and the following disclaimer in the documentation and/or
+//   other materials provided with the distribution.
+// - Neither the name "MapBox" nor the names of its contributors may be
+//   used to endorse or promote products derived from this software without
+//   specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef ARROW_UTIL_VARIANT_OPTIONAL_H
+#define ARROW_UTIL_VARIANT_OPTIONAL_H
+
+#pragma message("This implementation of optional is deprecated. See https://github.com/mapbox/variant/issues/64.")
+
+#include <type_traits>
+#include <utility>
+
+#include <arrow/util/variant.h>
+
+namespace arrow {
+namespace util {
+
+template <typename T>
+class optional
+{
+    static_assert(!std::is_reference<T>::value, "optional doesn't support references");
+
+    struct none_type
+    {
+    };
+
+    variant<none_type, T> variant_;
+
+public:
+    optional() = default;
+
+    optional(optional const& rhs)
+    {
+        if (this != &rhs)
+        { // protect against invalid self-assignment
+            variant_ = rhs.variant_;
+        }
+    }
+
+    optional(T const& v) { variant_ = v; }
+
+    explicit operator bool() const noexcept { return variant_.template is<T>(); }
+
+    T const& get() const { return variant_.template get<T>(); }
+    T& get() { return variant_.template get<T>(); }
+
+    T const& operator*() const { return this->get(); }
+    T operator*() { return this->get(); }
+
+    optional& operator=(T const& v)
+    {
+        variant_ = v;
+        return *this;
+    }
+
+    optional& operator=(optional const& rhs)
+    {
+        if (this != &rhs)
+        {
+            variant_ = rhs.variant_;
+        }
+        return *this;
+    }
+
+    template <typename... Args>
+    void emplace(Args&&... args)
+    {
+        variant_ = T{std::forward<Args>(args)...};
+    }
+
+    void reset() { variant_ = none_type{}; }
+
+}; // class optional
+
+} // namespace util
+} // namespace arrow
+
+#endif // ARROW_UTIL_VARIANT_OPTIONAL_H
diff --git a/cpp/src/arrow/util/variant/recursive_wrapper.h b/cpp/src/arrow/util/variant/recursive_wrapper.h
new file mode 100644
index 0000000..c9d9385
--- /dev/null
+++ b/cpp/src/arrow/util/variant/recursive_wrapper.h
@@ -0,0 +1,122 @@
+#ifndef ARROW_UTIL_VARIANT_RECURSIVE_WRAPPER_H
+#define ARROW_UTIL_VARIANT_RECURSIVE_WRAPPER_H
+
+// Based on variant/recursive_wrapper.h from boost.
+//
+// Original license:
+//
+// Copyright (c) 2002-2003
+// Eric Friedman, Itay Maman
+//
+// Distributed under the Boost Software License, Version 1.0. (See
+// accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include <cassert>
+#include <utility>
+
+namespace arrow {
+namespace util {
+
+template <typename T>
+class recursive_wrapper
+{
+
+    T* p_;
+
+    void assign(T const& rhs)
+    {
+        this->get() = rhs;
+    }
+
+public:
+    using type = T;
+
+    /**
+     * Default constructor default initializes the internally stored value.
+     * For POD types this means nothing is done and the storage is
+     * uninitialized.
+     *
+     * @throws std::bad_alloc if there is insufficient memory for an object
+     *         of type T.
+     * @throws any exception thrown by the default constructur of T.
+     */
+    recursive_wrapper()
+        : p_(new T){}
+
+    ~recursive_wrapper() noexcept { delete p_; }
+
+    recursive_wrapper(recursive_wrapper const& operand)
+        : p_(new T(operand.get())) {}
+
+    recursive_wrapper(T const& operand)
+        : p_(new T(operand)) {}
+
+    recursive_wrapper(recursive_wrapper&& operand)
+        : p_(new T(std::move(operand.get()))) {}
+
+    recursive_wrapper(T&& operand)
+        : p_(new T(std::move(operand))) {}
+
+    inline recursive_wrapper& operator=(recursive_wrapper const& rhs)
+    {
+        assign(rhs.get());
+        return *this;
+    }
+
+    inline recursive_wrapper& operator=(T const& rhs)
+    {
+        assign(rhs);
+        return *this;
+    }
+
+    inline void swap(recursive_wrapper& operand) noexcept
+    {
+        T* temp = operand.p_;
+        operand.p_ = p_;
+        p_ = temp;
+    }
+
+    recursive_wrapper& operator=(recursive_wrapper&& rhs) noexcept
+    {
+        swap(rhs);
+        return *this;
+    }
+
+    recursive_wrapper& operator=(T&& rhs)
+    {
+        get() = std::move(rhs);
+        return *this;
+    }
+
+    T& get()
+    {
+        assert(p_);
+        return *get_pointer();
+    }
+
+    T const& get() const
+    {
+        assert(p_);
+        return *get_pointer();
+    }
+
+    T* get_pointer() { return p_; }
+
+    const T* get_pointer() const { return p_; }
+
+    operator T const&() const { return this->get(); }
+
+    operator T&() { return this->get(); }
+
+}; // class recursive_wrapper
+
+template <typename T>
+inline void swap(recursive_wrapper<T>& lhs, recursive_wrapper<T>& rhs) noexcept
+{
+    lhs.swap(rhs);
+}
+} // namespace util
+} // namespace arrow
+
+#endif // ARROW_UTIL_VARIANT_RECURSIVE_WRAPPER_H
diff --git a/cpp/src/arrow/util/variant/variant_cast.h b/cpp/src/arrow/util/variant/variant_cast.h
new file mode 100644
index 0000000..558f1d9
--- /dev/null
+++ b/cpp/src/arrow/util/variant/variant_cast.h
@@ -0,0 +1,112 @@
+// Copyright (c) MapBox
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification,
+// are permitted provided that the following conditions are met:
+//
+// - Redistributions of source code must retain the above copyright notice, this
+//   list of conditions and the following disclaimer.
+// - Redistributions in binary form must reproduce the above copyright notice, this
+//   list of conditions and the following disclaimer in the documentation and/or
+//   other materials provided with the distribution.
+// - Neither the name "MapBox" nor the names of its contributors may be
+//   used to endorse or promote products derived from this software without
+//   specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef ARROW_UTIL_VARIANT_CAST_H
+#define ARROW_UTIL_VARIANT_CAST_H
+
+#include <type_traits>
+
+namespace arrow {
+namespace util {
+
+namespace detail {
+
+template <class T>
+class static_caster
+{
+public:
+    template <class V>
+    T& operator()(V& v) const
+    {
+        return static_cast<T&>(v);
+    }
+};
+
+template <class T>
+class dynamic_caster
+{
+public:
+    using result_type = T&;
+    template <class V>
+    T& operator()(V& v, typename std::enable_if<!std::is_polymorphic<V>::value>::type* = nullptr) const
+    {
+        throw std::bad_cast();
+    }
+    template <class V>
+    T& operator()(V& v, typename std::enable_if<std::is_polymorphic<V>::value>::type* = nullptr) const
+    {
+        return dynamic_cast<T&>(v);
+    }
+};
+
+template <class T>
+class dynamic_caster<T*>
+{
+public:
+    using result_type = T*;
+    template <class V>
+    T* operator()(V& v, typename std::enable_if<!std::is_polymorphic<V>::value>::type* = nullptr) const
+    {
+        return nullptr;
+    }
+    template <class V>
+    T* operator()(V& v, typename std::enable_if<std::is_polymorphic<V>::value>::type* = nullptr) const
+    {
+        return dynamic_cast<T*>(&v);
+    }
+};
+}
+
+template <class T, class V>
+typename detail::dynamic_caster<T>::result_type
+dynamic_variant_cast(V& v)
+{
+    return arrow::util::apply_visitor(detail::dynamic_caster<T>(), v);
+}
+
+template <class T, class V>
+typename detail::dynamic_caster<const T>::result_type
+dynamic_variant_cast(const V& v)
+{
+    return arrow::util::apply_visitor(detail::dynamic_caster<const T>(), v);
+}
+
+template <class T, class V>
+T& static_variant_cast(V& v)
+{
+    return arrow::util::apply_visitor(detail::static_caster<T>(), v);
+}
+
+template <class T, class V>
+const T& static_variant_cast(const V& v)
+{
+    return arrow::util::apply_visitor(detail::static_caster<const T>(), v);
+}
+
+}  // namespace util
+}  // namespace arrow
+
+#endif // ARROW_UTIL_VARIANT_CAST_H
diff --git a/cpp/src/arrow/util/variant/variant_io.h b/cpp/src/arrow/util/variant/variant_io.h
new file mode 100644
index 0000000..5541a81
--- /dev/null
+++ b/cpp/src/arrow/util/variant/variant_io.h
@@ -0,0 +1,72 @@
+// Copyright (c) MapBox
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification,
+// are permitted provided that the following conditions are met:
+//
+// - Redistributions of source code must retain the above copyright notice, this
+//   list of conditions and the following disclaimer.
+// - Redistributions in binary form must reproduce the above copyright notice, this
+//   list of conditions and the following disclaimer in the documentation and/or
+//   other materials provided with the distribution.
+// - Neither the name "MapBox" nor the names of its contributors may be
+//   used to endorse or promote products derived from this software without
+//   specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef ARROW_UTIL_VARIANT_IO_H
+#define ARROW_UTIL_VARIANT_IO_H
+
+#include <iosfwd>
+
+#include <arrow/util/variant.h>
+
+namespace arrow {
+namespace util {
+
+namespace detail {
+// operator<< helper
+template <typename Out>
+class printer
+{
+public:
+    explicit printer(Out& out)
+        : out_(out) {}
+    printer& operator=(printer const&) = delete;
+
+    // visitor
+    template <typename T>
+    void operator()(T const& operand) const
+    {
+        out_ << operand;
+    }
+
+private:
+    Out& out_;
+};
+}
+
+// operator<<
+template <typename CharT, typename Traits, typename... Types>
+VARIANT_INLINE std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& out, variant<Types...> const& rhs)
+{
+    detail::printer<std::basic_ostream<CharT, Traits>> visitor(out);
+    apply_visitor(visitor, rhs);
+    return out;
+}
+
+} // namespace util
+} // namespace arrow
+
+#endif // ARROW_UTIL_VARIANT_IO_H
diff --git a/cpp/src/arrow/util/variant/variant_visitor.h b/cpp/src/arrow/util/variant/variant_visitor.h
new file mode 100644
index 0000000..66b1dfe
--- /dev/null
+++ b/cpp/src/arrow/util/variant/variant_visitor.h
@@ -0,0 +1,69 @@
+// Copyright (c) MapBox
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification,
+// are permitted provided that the following conditions are met:
+//
+// - Redistributions of source code must retain the above copyright notice, this
+//   list of conditions and the following disclaimer.
+// - Redistributions in binary form must reproduce the above copyright notice, this
+//   list of conditions and the following disclaimer in the documentation and/or
+//   other materials provided with the distribution.
+// - Neither the name "MapBox" nor the names of its contributors may be
+//   used to endorse or promote products derived from this software without
+//   specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef ARROW_UTIL_VARIANT_VISITOR_HPP
+#define ARROW_UTIL_VARIANT_VISITOR_HPP
+
+#include <utility>
+
+namespace arrow {
+namespace util {
+
+template <typename... Fns>
+struct visitor;
+
+template <typename Fn>
+struct visitor<Fn> : Fn
+{
+    using Fn::operator();
+
+    template<typename T>
+    visitor(T&& fn) : Fn(std::forward<T>(fn)) {}
+};
+
+template <typename Fn, typename... Fns>
+struct visitor<Fn, Fns...> : Fn, visitor<Fns...>
+{
+    using Fn::operator();
+    using visitor<Fns...>::operator();
+
+    template<typename T, typename... Ts>
+    visitor(T&& fn, Ts&&... fns)
+        : Fn(std::forward<T>(fn))
+        , visitor<Fns...>(std::forward<Ts>(fns)...) {}
+};
+
+template <typename... Fns>
+visitor<typename std::decay<Fns>::type...> make_visitor(Fns&&... fns)
+{
+    return visitor<typename std::decay<Fns>::type...>
+        (std::forward<Fns>(fns)...);
+}
+
+} // namespace util
+} // namespace arrow
+
+#endif // ARROW_UTIL_VARIANT_VISITOR_HPP
diff --git a/cpp/src/arrow/visitor.cc b/cpp/src/arrow/visitor.cc
index 3739e89..47dba6c 100644
--- a/cpp/src/arrow/visitor.cc
+++ b/cpp/src/arrow/visitor.cc
@@ -90,7 +90,7 @@ TYPE_VISITOR_DEFAULT(Time32Type);
 TYPE_VISITOR_DEFAULT(Time64Type);
 TYPE_VISITOR_DEFAULT(TimestampType);
 TYPE_VISITOR_DEFAULT(IntervalType);
-TYPE_VISITOR_DEFAULT(DecimalType);
+TYPE_VISITOR_DEFAULT(Decimal128Type);
 TYPE_VISITOR_DEFAULT(ListType);
 TYPE_VISITOR_DEFAULT(StructType);
 TYPE_VISITOR_DEFAULT(UnionType);
diff --git a/cpp/src/arrow/visitor_inline.h b/cpp/src/arrow/visitor_inline.h
index 72c82a3..41b0108 100644
--- a/cpp/src/arrow/visitor_inline.h
+++ b/cpp/src/arrow/visitor_inline.h
@@ -97,7 +97,7 @@ inline Status VisitArrayInline(const Array& array, VISITOR* visitor) {
     ARRAY_VISIT_INLINE(TimestampType);
     ARRAY_VISIT_INLINE(Time32Type);
     ARRAY_VISIT_INLINE(Time64Type);
-    ARRAY_VISIT_INLINE(DecimalType);
+    ARRAY_VISIT_INLINE(Decimal128Type);
     ARRAY_VISIT_INLINE(ListType);
     ARRAY_VISIT_INLINE(StructType);
     ARRAY_VISIT_INLINE(UnionType);
diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt
index db3b3aa..bf962bc 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -10,6 +10,12 @@ cpp/src/arrow/io/mman.h
 cpp/src/arrow/util/random.h
 cpp/src/arrow/status.cc
 cpp/src/arrow/status.h
+cpp/src/arrow/util/variant.h
+cpp/src/arrow/util/variant/optional.h
+cpp/src/arrow/util/variant/recursive_wrapper.h
+cpp/src/arrow/util/variant/variant_cast.h
+cpp/src/arrow/util/variant/variant_io.h
+cpp/src/arrow/util/variant/variant_visitor.h
 cpp/build-support/asan_symbolize.py
 cpp/build-support/cpplint.py
 cpp/build-support/clang_format_exclusions.txt
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index c520240..8f2f23d 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -132,6 +132,7 @@ Array Types
 .. autosummary::
    :toctree: generated/
 
+   array
    Array
    BooleanArray
    DictionaryArray
@@ -168,6 +169,8 @@ Tables and Record Batches
 .. autosummary::
    :toctree: generated/
 
+   column
+   chunked_array
    ChunkedArray
    Column
    RecordBatch
diff --git a/python/doc/source/development.rst b/python/doc/source/development.rst
index 7ef6a72..8856137 100644
--- a/python/doc/source/development.rst
+++ b/python/doc/source/development.rst
@@ -84,7 +84,7 @@ from conda-forge:
    conda create -y -q -n pyarrow-dev \
          python=3.6 numpy six setuptools cython pandas pytest \
          cmake flatbuffers rapidjson boost-cpp thrift-cpp snappy zlib \
-         brotli jemalloc lz4-c zstd -c conda-forge
+         gflags brotli jemalloc lz4-c zstd -c conda-forge
    source activate pyarrow-dev
 
 
@@ -256,17 +256,11 @@ First, starting from fresh clones of Apache Arrow and parquet-cpp:
 
 .. code-block:: shell
 
-   conda create -n arrow-dev cmake git boost-cpp ^
-         flatbuffers snappy zlib brotli thrift-cpp rapidjson ^
-         -c conda-forge
-   activate arrow-dev
-
-As one git housekeeping item, we must run this command in our Arrow clone:
-
-.. code-block:: shell
-
-   cd arrow
-   git config core.symlinks true
+   conda create -y -q -n pyarrow-dev ^
+         python=3.6 numpy six setuptools cython pandas pytest ^
+         cmake flatbuffers rapidjson boost-cpp thrift-cpp snappy zlib ^
+         gflags brotli lz4-c zstd -c conda-forge
+   activate pyarrow-dev
 
 Now, we build and install Arrow C++ libraries
 
@@ -280,7 +274,7 @@ Now, we build and install Arrow C++ libraries
          -DCMAKE_INSTALL_PREFIX=%ARROW_HOME% ^
          -DCMAKE_BUILD_TYPE=Release ^
          -DARROW_BUILD_TESTS=on ^
-         -DARROW_CXXFLAGS="/WX" ^
+         -DARROW_CXXFLAGS="/WX /MP" ^
          -DARROW_PYTHON=on ..
    cmake --build . --target INSTALL --config Release
    cd ..\..
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 09bf6b3..c8ded2d 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -43,7 +43,7 @@ from pyarrow.lib import (null, bool_,
                          Schema,
                          schema,
                          Array, Tensor,
-                         array,
+                         array, chunked_array, column,
                          from_numpy_dtype,
                          NullArray,
                          NumericArray, IntegerArray, FloatingPointArray,
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index 2ef592f..cca9425 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -77,7 +77,7 @@ cdef _ndarray_to_array(object values, object mask, DataType type,
         return pyarrow_wrap_array(chunked_out.get().chunk(0))
 
 
-cdef DataType _ensure_type(object type):
+cdef inline DataType _ensure_type(object type):
     if type is None:
         return None
     elif not isinstance(type, DataType):
@@ -228,6 +228,15 @@ cdef CFunctionContext* _context() nogil:
     return _global_ctx.ctx.get()
 
 
+cdef wrap_datum(const CDatum& datum):
+    if datum.kind() == DatumType_ARRAY:
+        return pyarrow_wrap_array(MakeArray(datum.array()))
+    elif datum.kind() == DatumType_CHUNKED_ARRAY:
+        return pyarrow_wrap_chunked_array(datum.chunked_array())
+    else:
+        raise ValueError("Unable to wrap Datum in a Python object")
+
+
 cdef class Array:
 
     cdef void init(self, const shared_ptr[CArray]& sp_array):
@@ -270,6 +279,29 @@ cdef class Array:
 
         return pyarrow_wrap_array(result)
 
+    def unique(self):
+        """
+        Compute distinct elements in array
+        """
+        cdef shared_ptr[CArray] result
+
+        with nogil:
+            check_status(Unique(_context(), CDatum(self.sp_array), &result))
+
+        return pyarrow_wrap_array(result)
+
+    def dictionary_encode(self):
+        """
+        Compute dictionary-encoded representation of array
+        """
+        cdef CDatum out
+
+        with nogil:
+            check_status(DictionaryEncode(_context(), CDatum(self.sp_array),
+                                          &out))
+
+        return wrap_datum(out)
+
     @staticmethod
     def from_pandas(obj, mask=None, type=None, MemoryPool memory_pool=None):
         """
@@ -702,6 +734,9 @@ cdef class DictionaryArray(Array):
             return box_scalar(dictionary.type, dictionary.sp_array,
                               index.as_py())
 
+    def dictionary_encode(self):
+        return self
+
     property dictionary:
 
         def __get__(self):
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 11cc6b3..dbfd89c 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -90,6 +90,14 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
         c_string ToString()
 
+    cdef cppclass CArrayData" arrow::ArrayData":
+        shared_ptr[CDataType] type
+        int64_t length
+        int64_t null_count
+        int64_t offset
+        vector[shared_ptr[CBuffer]] buffers
+        vector[shared_ptr[CArrayData]] child_data
+
     cdef cppclass CArray" arrow::Array":
         shared_ptr[CDataType] type()
 
@@ -102,9 +110,13 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         c_bool Equals(const CArray& arr)
         c_bool IsNull(int i)
 
+        shared_ptr[CArrayData] data()
+
         shared_ptr[CArray] Slice(int64_t offset)
         shared_ptr[CArray] Slice(int64_t offset, int64_t length)
 
+    shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data)
+
     CStatus DebugPrint(const CArray& arr, int indent)
 
     cdef cppclass CFixedWidthType" arrow::FixedWidthType"(CDataType):
@@ -363,6 +375,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
     CStatus ValidateArray(const CArray& array)
 
     cdef cppclass CChunkedArray" arrow::ChunkedArray":
+        CChunkedArray(const vector[shared_ptr[CArray]]& arrays)
         int64_t length()
         int64_t null_count()
         int num_chunks()
@@ -376,8 +389,13 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         CColumn(const shared_ptr[CField]& field,
                 const vector[shared_ptr[CArray]]& chunks)
 
+        CColumn(const shared_ptr[CField]& field,
+                const shared_ptr[CChunkedArray]& data)
+
         c_bool Equals(const CColumn& other)
 
+        shared_ptr[CField] field()
+
         int64_t length()
         int64_t null_count()
         const c_string& name()
@@ -776,11 +794,42 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil:
         c_bool allow_int_overflow
         c_bool allow_time_truncate
 
+    enum DatumType" arrow::compute::Datum::type":
+        DatumType_NONE" arrow::compute::Datum::NONE"
+        DatumType_SCALAR" arrow::compute::Datum::SCALAR"
+        DatumType_ARRAY" arrow::compute::Datum::ARRAY"
+        DatumType_CHUNKED_ARRAY" arrow::compute::Datum::CHUNKED_ARRAY"
+        DatumType_RECORD_BATCH" arrow::compute::Datum::RECORD_BATCH"
+        DatumType_TABLE" arrow::compute::Datum::TABLE"
+        DatumType_COLLECTION" arrow::compute::Datum::COLLECTION"
+
+    cdef cppclass CDatum" arrow::compute::Datum":
+        CDatum()
+        CDatum(const shared_ptr[CArray]& value)
+        CDatum(const shared_ptr[CChunkedArray]& value)
+        CDatum(const shared_ptr[CRecordBatch]& value)
+        CDatum(const shared_ptr[CTable]& value)
+
+        DatumType kind()
+
+        shared_ptr[CArrayData] array()
+        shared_ptr[CChunkedArray] chunked_array()
+
     CStatus Cast(CFunctionContext* context, const CArray& array,
                  const shared_ptr[CDataType]& to_type,
                  const CCastOptions& options,
                  shared_ptr[CArray]* out)
 
+    CStatus Cast(CFunctionContext* context, const CDatum& value,
+                 const shared_ptr[CDataType]& to_type,
+                 const CCastOptions& options, CDatum* out)
+
+    CStatus Unique(CFunctionContext* context, const CDatum& value,
+                   shared_ptr[CArray]* out)
+
+    CStatus DictionaryEncode(CFunctionContext* context, const CDatum& value,
+                             CDatum* out)
+
 
 cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
     shared_ptr[CDataType] GetPrimitiveType(Type type)
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index 41eaf0b..0aab9a4 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -362,7 +362,8 @@ def backwards_compatible_index_name(raw_name, logical_name):
         return logical_name
 
 
-def table_to_blockmanager(options, table, memory_pool, nthreads=1):
+def table_to_blockmanager(options, table, memory_pool, nthreads=1,
+                          categoricals=None):
     import pandas.core.internals as _int
     import pyarrow.lib as lib
 
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 5ba5f83..1a9d23d 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -108,6 +108,15 @@ cdef class ChunkedArray:
 
         return pyarrow_wrap_array(self.chunked_array.chunk(i))
 
+    property chunks:
+
+        def __get__(self):
+            cdef int i
+            chunks = []
+            for i in range(self.num_chunks):
+                chunks.append(self.chunk(i))
+            return chunks
+
     def iterchunks(self):
         for i in range(self.num_chunks):
             yield self.chunk(i)
@@ -122,6 +131,74 @@ cdef class ChunkedArray:
         return result
 
 
+def chunked_array(arrays, type=None):
+    """
+    Construct chunked array from list of array-like objects
+
+    Parameters
+    ----------
+    arrays : list of Array or values coercible to arrays
+    type : DataType or string coercible to DataType
+
+    Returns
+    -------
+    ChunkedArray
+    """
+    cdef:
+        Array arr
+        vector[shared_ptr[CArray]] c_arrays
+        shared_ptr[CChunkedArray] sp_chunked_array
+
+    for x in arrays:
+        if isinstance(x, Array):
+            arr = x
+            if type is not None:
+                assert x.type == type
+        else:
+            arr = array(x, type=type)
+
+        c_arrays.push_back(arr.sp_array)
+
+    sp_chunked_array.reset(new CChunkedArray(c_arrays))
+    return pyarrow_wrap_chunked_array(sp_chunked_array)
+
+
+def column(object field_or_name, arr):
+    """
+    Create Column object from field/string and array-like data
+    """
+    cdef:
+        Field boxed_field
+        Array _arr
+        ChunkedArray _carr
+        shared_ptr[CColumn] sp_column
+
+    if isinstance(arr, list):
+        arr = chunked_array(arr)
+    elif not isinstance(arr, (Array, ChunkedArray)):
+        arr = array(arr)
+
+    if isinstance(field_or_name, Field):
+        boxed_field = field_or_name
+        if arr.type != boxed_field.type:
+            raise ValueError('Passed field type does not match array')
+    else:
+        boxed_field = field(field_or_name, arr.type)
+
+    if isinstance(arr, Array):
+        _arr = arr
+        sp_column.reset(new CColumn(boxed_field.sp_field, _arr.sp_array))
+    elif isinstance(arr, ChunkedArray):
+        _carr = arr
+        sp_column.reset(new CColumn(boxed_field.sp_field,
+                                    _carr.sp_chunked_array))
+    else:
+        raise ValueError("Unsupported type for column(...): {}"
+                         .format(type(arr)))
+
+    return pyarrow_wrap_column(sp_column)
+
+
 cdef class Column:
     """
     Named vector of elements of equal type.
@@ -143,25 +220,47 @@ cdef class Column:
         result = StringIO()
         result.write(object.__repr__(self))
         data = self.data
-        for i in range(len(data)):
-            result.write('\nchunk {0}: {1}'.format(i, repr(data.chunk(0))))
+        for i, chunk in enumerate(data.chunks):
+            result.write('\nchunk {0}: {1}'.format(i, repr(chunk)))
 
         return result.getvalue()
 
     @staticmethod
-    def from_array(object field_or_name, Array arr):
-        cdef Field boxed_field
+    def from_array(*args):
+        return column(*args)
 
-        if isinstance(field_or_name, Field):
-            boxed_field = field_or_name
-            if arr.type != boxed_field.type:
-                raise ValueError('Passed field type does not match array')
-        else:
-            boxed_field = field(field_or_name, arr.type)
+    def cast(self, object target_type, safe=True):
+        """
+        Cast column values to another data type
+
+        Parameters
+        ----------
+        target_type : DataType
+            Type to cast to
+        safe : boolean, default True
+            Check for overflows or other unsafe conversions
+
+        Returns
+        -------
+        casted : Column
+        """
+        cdef:
+            CCastOptions options
+            shared_ptr[CArray] result
+            DataType type
+            CDatum out
+
+        type = _ensure_type(target_type)
 
-        cdef shared_ptr[CColumn] sp_column
-        sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array))
-        return pyarrow_wrap_column(sp_column)
+        options.allow_int_overflow = not safe
+        options.allow_time_truncate = not safe
+
+        with nogil:
+            check_status(Cast(_context(), CDatum(self.column.data()),
+                              type.sp_type, options, &out))
+
+        casted_data = pyarrow_wrap_chunked_array(out.chunked_array())
+        return column(self.name, casted_data)
 
     def to_pandas(self, strings_to_categorical=False, zero_copy_only=False):
         """
@@ -242,6 +341,10 @@ cdef class Column:
         return self.column.length()
 
     @property
+    def field(self):
+        return pyarrow_wrap_field(self.column.field())
+
+    @property
     def shape(self):
         """
         Dimensions of this columns
diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py
index 7dc93c2..c061e68 100644
--- a/python/pyarrow/tests/test_array.py
+++ b/python/pyarrow/tests/test_array.py
@@ -295,6 +295,18 @@ def test_cast_integers_safe():
             in_arr.cast(out_type)
 
 
+def test_cast_column():
+    arrays = [pa.array([1, 2, 3]), pa.array([4, 5, 6])]
+
+    col = pa.column('foo', arrays)
+
+    target = pa.float64()
+    casted = col.cast(target)
+
+    expected = pa.column('foo', [x.cast(target) for x in arrays])
+    assert casted.equals(expected)
+
+
 def test_cast_integers_unsafe():
     # We let NumPy do the unsafe casting
     unsafe_cases = [
@@ -350,6 +362,33 @@ def test_cast_signed_to_unsigned():
         _check_cast_case(case)
 
 
+def test_unique_simple():
+    cases = [
+        (pa.array([1, 2, 3, 1, 2, 3]), pa.array([1, 2, 3])),
+        (pa.array(['foo', None, 'bar', 'foo']),
+         pa.array(['foo', 'bar']))
+    ]
+    for arr, expected in cases:
+        result = arr.unique()
+        assert result.equals(expected)
+
+
+def test_dictionary_encode_simple():
+    cases = [
+        (pa.array([1, 2, 3, None, 1, 2, 3]),
+         pa.DictionaryArray.from_arrays(
+             pa.array([0, 1, 2, None, 0, 1, 2], type='int32'),
+             [1, 2, 3])),
+        (pa.array(['foo', None, 'bar', 'foo']),
+         pa.DictionaryArray.from_arrays(
+             pa.array([0, None, 1, 0], type='int32'),
+             ['foo', 'bar']))
+    ]
+    for arr, expected in cases:
+        result = arr.dictionary_encode()
+        assert result.equals(expected)
+
+
 def test_simple_type_construction():
     result = pa.lib.TimestampType()
     with pytest.raises(TypeError):
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 6ba4fd2..1df80ac 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1436,6 +1436,7 @@ def test_large_table_int32_overflow():
     _write_table(table, f)
 
 
+@parquet
 def test_index_column_name_duplicate(tmpdir):
     data = {
         'close': {
@@ -1460,6 +1461,7 @@ def test_index_column_name_duplicate(tmpdir):
     tm.assert_frame_equal(result_df, dfx)
 
 
+@parquet
 def test_backwards_compatible_index_naming():
     expected_string = b"""\
 carat        cut  color  clarity  depth  table  price     x     y     z
@@ -1482,6 +1484,7 @@ carat        cut  color  clarity  depth  table  price     x     y     z
     tm.assert_frame_equal(result, expected)
 
 
+@parquet
 def test_backwards_compatible_index_multi_level_named():
     expected_string = b"""\
 carat        cut  color  clarity  depth  table  price     x     y     z
@@ -1507,6 +1510,7 @@ carat        cut  color  clarity  depth  table  price     x     y     z
     tm.assert_frame_equal(result, expected)
 
 
+@parquet
 def test_backwards_compatible_index_multi_level_some_named():
     expected_string = b"""\
 carat        cut  color  clarity  depth  table  price     x     y     z
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index edf0d8a..abd678b 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -1108,6 +1108,7 @@ def union(children_fields, mode):
 
     return pyarrow_wrap_data_type(union_type)
 
+
 cdef dict _type_aliases = {
     'null': null,
     'i1': int8,

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].