You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/04/17 14:47:40 UTC

[arrow] branch master updated: ARROW-4708: [C++] refactoring JSON parser to prepare for multithreaded impl

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b496913  ARROW-4708: [C++] refactoring JSON parser to prepare for multithreaded impl
b496913 is described below

commit b496913e5ddb7d8bbef65a59be5a54ac14bf6740
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Wed Apr 17 16:47:31 2019 +0200

    ARROW-4708: [C++] refactoring JSON parser to prepare for multithreaded impl
    
    - don't use in-situ parsing
    - remove UnsafeStringBuilder (which was only useful when parsing in-situ)
    - don't require null termination of parsed buffers
    - resize parser's scalar storage when it might overflow
    - rewrite chunker to use Buffer instead of string_view
      to represent memory ranges
    - iwyu + lint cleanup
    - add test for parsing JSON with a partial schema
    - add test for inferring timestamp type in an unexpected
      field of strings
    - refactor rapidjson defines into a single header
    - correct SSE detection
    - disable MSVC conversion errors (to match GCC and Clang options)
    - allow ArrayFromJSON to parse timestamps from strings
    
    Author: Benjamin Kietzman <be...@gmail.com>
    
    Closes #4148 from bkietz/4708-Add-multithreaded-JSON-reader and squashes the following commits:
    
    52edad826 <Benjamin Kietzman> include arrow/utils/macros.h for ARROW_BITNESS
    d3503250f <Benjamin Kietzman> #include sse-utils in rapidjson-def for sse macros
    aa613438a <Benjamin Kietzman> correct ConsumeWhitespace for non-SIMD case
    e62ce02a6 <Benjamin Kietzman> use arrow sse macros
    673ae342e <Benjamin Kietzman> add init for error case to make MSVC happy
    e005a89c3 <Benjamin Kietzman> correct SSE detection
    11ba680ec <Benjamin Kietzman> refactor rapidjson defines to a single header
    cb638a6f5 <Benjamin Kietzman> address review comments
    ee8c5e5d4 <Benjamin Kietzman> functions in source files should be static
    a8fd17268 <Benjamin Kietzman> disable conversion warnings for MSVC
    866af6d54 <Benjamin Kietzman> add iwyu includes, use pragma once instead of guards
    2c990bf1f <Benjamin Kietzman> remove redundant KindFromTag()
    058607bca <Benjamin Kietzman> fix build error
    ab27659a5 <Benjamin Kietzman> refactoring JSON parser to prepare for multithreaded impl
---
 cpp/cmake_modules/SetupCxxFlags.cmake          |   4 +-
 cpp/src/arrow/buffer.h                         |  29 ++-
 cpp/src/arrow/ipc/json-internal.h              |  13 +-
 cpp/src/arrow/ipc/json-simple-test.cc          |  13 +
 cpp/src/arrow/ipc/json-simple.cc               | 167 ++++++++-----
 cpp/src/arrow/json/api.h                       |   5 +-
 cpp/src/arrow/json/chunker-test.cc             | 138 +++++------
 cpp/src/arrow/json/chunker.cc                  | 143 ++++++-----
 cpp/src/arrow/json/chunker.h                   |  31 ++-
 cpp/src/arrow/json/options.h                   |   9 +-
 cpp/src/arrow/json/parser-benchmark.cc         |  74 +++++-
 cpp/src/arrow/json/parser-test.cc              |  93 ++------
 cpp/src/arrow/json/parser.cc                   | 314 +++++++++++++------------
 cpp/src/arrow/json/parser.h                    |  97 +++-----
 cpp/src/arrow/json/{api.h => rapidjson-defs.h} |  30 ++-
 cpp/src/arrow/json/reader.cc                   |  51 ++--
 cpp/src/arrow/json/reader.h                    |  22 +-
 cpp/src/arrow/json/test-common.h               |  71 ++++--
 cpp/src/arrow/type_traits.h                    |  81 ++++---
 cpp/src/arrow/util/parsing.h                   |  40 +++-
 cpp/src/arrow/util/sse-util.h                  |  14 +-
 21 files changed, 780 insertions(+), 659 deletions(-)

diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index 3ce2dc8..8cb2585 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -138,8 +138,8 @@ endmacro()
 if("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
   # Pre-checkin builds
   if("${COMPILER_FAMILY}" STREQUAL "msvc")
-    string(REPLACE "/W3" "" CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS}")
-    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /W3")
+    # https://docs.microsoft.com/en-us/cpp/error-messages/compiler-warnings/compiler-warnings-by-compiler-version
+    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /W3 /wd4365 /wd4267 /wd4838")
   elseif("${COMPILER_FAMILY}" STREQUAL "clang")
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Weverything -Wno-c++98-compat \
 -Wno-c++98-compat-pedantic -Wno-deprecated -Wno-weak-vtables -Wno-padded \
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index 1f1cd4b..20a7969 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -29,6 +29,7 @@
 #include "arrow/memory_pool.h"
 #include "arrow/status.h"
 #include "arrow/util/macros.h"
+#include "arrow/util/string_view.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
@@ -62,14 +63,14 @@ class ARROW_EXPORT Buffer {
         size_(size),
         capacity_(size) {}
 
-  /// \brief Construct from std::string without copying memory
+  /// \brief Construct from string_view without copying memory
   ///
-  /// \param[in] data a std::string object
+  /// \param[in] data a string_view object
   ///
-  /// \note The std::string must stay alive for the lifetime of the Buffer, so
-  /// temporary rvalue strings must be stored in an lvalue somewhere
-  explicit Buffer(const std::string& data)
-      : Buffer(reinterpret_cast<const uint8_t*>(data.c_str()),
+  /// \note The memory viewed by data must not be deallocated in the lifetime of the
+  /// Buffer; temporary rvalue strings must be stored in an lvalue somewhere
+  explicit Buffer(util::string_view data)
+      : Buffer(reinterpret_cast<const uint8_t*>(data.data()),
                static_cast<int64_t>(data.size())) {}
 
   virtual ~Buffer() = default;
@@ -161,6 +162,12 @@ class ARROW_EXPORT Buffer {
   /// \note Can throw std::bad_alloc if buffer is large
   std::string ToString() const;
 
+  /// \brief View buffer contents as a util::string_view
+  /// \return util::string_view
+  explicit operator util::string_view() const {
+    return util::string_view(reinterpret_cast<const char*>(data_), size_);
+  }
+
   /// \brief Return a pointer to the buffer's data
   const uint8_t* data() const { return data_; }
   /// \brief Return a writable pointer to the buffer's data
@@ -230,6 +237,16 @@ ARROW_EXPORT
 std::shared_ptr<Buffer> SliceMutableBuffer(const std::shared_ptr<Buffer>& buffer,
                                            const int64_t offset, const int64_t length);
 
+/// \brief Like SliceBuffer, but construct a mutable buffer slice.
+///
+/// If the parent buffer is not mutable, behavior is undefined (it may abort
+/// in debug builds).
+static inline std::shared_ptr<Buffer> SliceMutableBuffer(
+    const std::shared_ptr<Buffer>& buffer, const int64_t offset) {
+  int64_t length = buffer->size() - offset;
+  return SliceMutableBuffer(buffer, offset, length);
+}
+
 /// @}
 
 /// \class MutableBuffer
diff --git a/cpp/src/arrow/ipc/json-internal.h b/cpp/src/arrow/ipc/json-internal.h
index c8c7249..b69c8bb 100644
--- a/cpp/src/arrow/ipc/json-internal.h
+++ b/cpp/src/arrow/ipc/json-internal.h
@@ -18,22 +18,11 @@
 #ifndef ARROW_IPC_JSON_INTERNAL_H
 #define ARROW_IPC_JSON_INTERNAL_H
 
-#define RAPIDJSON_HAS_STDSTRING 1
-#define RAPIDJSON_HAS_CXX11_RVALUE_REFS 1
-#define RAPIDJSON_HAS_CXX11_RANGE_FOR 1
-
-#define RAPIDJSON_NAMESPACE arrow::rapidjson
-#define RAPIDJSON_NAMESPACE_BEGIN \
-  namespace arrow {               \
-  namespace rapidjson {
-#define RAPIDJSON_NAMESPACE_END \
-  }                             \
-  }
-
 #include <memory>
 #include <sstream>
 #include <string>
 
+#include "arrow/json/rapidjson-defs.h"
 #include "rapidjson/document.h"      // IWYU pragma: export
 #include "rapidjson/encodings.h"     // IWYU pragma: export
 #include "rapidjson/error/en.h"      // IWYU pragma: export
diff --git a/cpp/src/arrow/ipc/json-simple-test.cc b/cpp/src/arrow/ipc/json-simple-test.cc
index 1bb04a3..ad2f175 100644
--- a/cpp/src/arrow/ipc/json-simple-test.cc
+++ b/cpp/src/arrow/ipc/json-simple-test.cc
@@ -321,6 +321,19 @@ TEST(TestString, Basics) {
   AssertJSONArray<BinaryType, std::string>(type, "[\"\\u0000\\u001f\"]", {s});
 }
 
+TEST(TestTimestamp, Basics) {
+  // Timestamp type
+  auto type = timestamp(TimeUnit::SECOND);
+  AssertJSONArray<TimestampType, int64_t>(
+      type, R"(["1970-01-01","2000-02-29","3989-07-14","1900-02-28"])",
+      {0, 951782400, 63730281600LL, -2203977600LL});
+
+  type = timestamp(TimeUnit::NANO);
+  AssertJSONArray<TimestampType, int64_t>(
+      type, R"(["1970-01-01","2000-02-29","1900-02-28"])",
+      {0, 951782400000000000LL, -2203977600000000000LL});
+}
+
 TEST(TestString, Errors) {
   std::shared_ptr<DataType> type = utf8();
   std::shared_ptr<Array> array;
diff --git a/cpp/src/arrow/ipc/json-simple.cc b/cpp/src/arrow/ipc/json-simple.cc
index 2861bd7..29da85e 100644
--- a/cpp/src/arrow/ipc/json-simple.cc
+++ b/cpp/src/arrow/ipc/json-simple.cc
@@ -29,6 +29,7 @@
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/parsing.h"
 #include "arrow/util/string_view.h"
 
 namespace arrow {
@@ -91,8 +92,6 @@ class ConcreteConverter : public Converter {
   }
 };
 
-// TODO : dates and times?
-
 // ------------------------------------------------------------------------
 // Converter for null arrays
 
@@ -114,7 +113,7 @@ class NullConverter final : public ConcreteConverter<NullConverter> {
 
   std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
 
- protected:
+ private:
   std::shared_ptr<NullBuilder> builder_;
 };
 
@@ -145,11 +144,67 @@ class BooleanConverter final : public ConcreteConverter<BooleanConverter> {
 
   std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
 
- protected:
+ private:
   std::shared_ptr<BooleanBuilder> builder_;
 };
 
 // ------------------------------------------------------------------------
+// Helpers for numeric converters
+
+// Convert single signed integer value (also {Date,Time}{32,64} and Timestamp)
+template <typename T>
+typename std::enable_if<is_signed_integer<T>::value || is_date<T>::value ||
+                            is_time<T>::value || is_timestamp<T>::value,
+                        Status>::type
+ConvertNumber(const rj::Value& json_obj, typename T::c_type* out) {
+  if (json_obj.IsInt64()) {
+    int64_t v64 = json_obj.GetInt64();
+    *out = static_cast<typename T::c_type>(v64);
+    if (*out == v64) {
+      return Status::OK();
+    } else {
+      return Status::Invalid("Value ", v64, " out of bounds for ",
+                             TypeTraits<T>::type_singleton());
+    }
+  } else {
+    *out = static_cast<typename T::c_type>(0);
+    return JSONTypeError("signed int", json_obj.GetType());
+  }
+}
+
+// Convert single unsigned integer value
+template <typename T>
+enable_if_unsigned_integer<T, Status> ConvertNumber(const rj::Value& json_obj,
+                                                    typename T::c_type* out) {
+  if (json_obj.IsUint64()) {
+    uint64_t v64 = json_obj.GetUint64();
+    *out = static_cast<typename T::c_type>(v64);
+    if (*out == v64) {
+      return Status::OK();
+    } else {
+      return Status::Invalid("Value ", v64, " out of bounds for ",
+                             TypeTraits<T>::type_singleton());
+    }
+  } else {
+    *out = static_cast<typename T::c_type>(0);
+    return JSONTypeError("unsigned int", json_obj.GetType());
+  }
+}
+
+// Convert single floating point value
+template <typename T>
+enable_if_floating_point<T, Status> ConvertNumber(const rj::Value& json_obj,
+                                                  typename T::c_type* out) {
+  if (json_obj.IsNumber()) {
+    *out = static_cast<typename T::c_type>(json_obj.GetDouble());
+    return Status::OK();
+  } else {
+    *out = static_cast<typename T::c_type>(0);
+    return JSONTypeError("number", json_obj.GetType());
+  }
+}
+
+// ------------------------------------------------------------------------
 // Converter for int arrays
 
 template <typename Type>
@@ -169,49 +224,14 @@ class IntegerConverter final : public ConcreteConverter<IntegerConverter<Type>>
     if (json_obj.IsNull()) {
       return AppendNull();
     }
-    return AppendNumber(json_obj);
+    c_type value;
+    RETURN_NOT_OK(ConvertNumber<Type>(json_obj, &value));
+    return builder_->Append(value);
   }
 
   std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
 
- protected:
-  // Append signed integer value
-  template <typename Integer = c_type>
-  typename std::enable_if<std::is_signed<Integer>::value, Status>::type AppendNumber(
-      const rj::Value& json_obj) {
-    if (json_obj.IsInt64()) {
-      int64_t v64 = json_obj.GetInt64();
-      c_type v = static_cast<c_type>(v64);
-      if (v == v64) {
-        return builder_->Append(v);
-      } else {
-        return Status::Invalid("Value ", v64, " out of bounds for ",
-                               this->type_->ToString());
-      }
-    } else {
-      return JSONTypeError("signed int", json_obj.GetType());
-    }
-  }
-
-  // Append unsigned integer value
-  template <typename Integer = c_type>
-  typename std::enable_if<std::is_unsigned<Integer>::value, Status>::type AppendNumber(
-      const rj::Value& json_obj) {
-    if (json_obj.IsUint64()) {
-      uint64_t v64 = json_obj.GetUint64();
-      c_type v = static_cast<c_type>(v64);
-      if (v == v64) {
-        return builder_->Append(v);
-      } else {
-        return Status::Invalid("Value ", v64, " out of bounds for ",
-                               this->type_->ToString());
-      }
-      return builder_->Append(v);
-    } else {
-      return JSONTypeError("unsigned int", json_obj.GetType());
-    }
-  }
-
+ private:
   std::shared_ptr<NumericBuilder<Type>> builder_;
 };
 
@@ -234,17 +254,14 @@ class FloatConverter final : public ConcreteConverter<FloatConverter<Type>> {
     if (json_obj.IsNull()) {
       return AppendNull();
     }
-    if (json_obj.IsNumber()) {
-      c_type v = static_cast<c_type>(json_obj.GetDouble());
-      return builder_->Append(v);
-    } else {
-      return JSONTypeError("number", json_obj.GetType());
-    }
+    c_type value;
+    RETURN_NOT_OK(ConvertNumber<Type>(json_obj, &value));
+    return builder_->Append(value);
   }
 
   std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
 
- protected:
+ private:
   std::shared_ptr<NumericBuilder<Type>> builder_;
 };
 
@@ -281,12 +298,50 @@ class DecimalConverter final : public ConcreteConverter<DecimalConverter> {
 
   std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
 
- protected:
+ private:
   std::shared_ptr<DecimalBuilder> builder_;
   Decimal128Type* decimal_type_;
 };
 
 // ------------------------------------------------------------------------
+// Converter for timestamp arrays
+
+class TimestampConverter final : public ConcreteConverter<TimestampConverter> {
+ public:
+  explicit TimestampConverter(const std::shared_ptr<DataType>& type)
+      : from_string_(type) {
+    this->type_ = type;
+    builder_ = std::make_shared<TimestampBuilder>(type, default_memory_pool());
+  }
+
+  Status AppendNull() override { return builder_->AppendNull(); }
+
+  Status AppendValue(const rj::Value& json_obj) override {
+    if (json_obj.IsNull()) {
+      return AppendNull();
+    }
+    int64_t value;
+    if (json_obj.IsNumber()) {
+      RETURN_NOT_OK(ConvertNumber<Int64Type>(json_obj, &value));
+    } else if (json_obj.IsString()) {
+      auto view = util::string_view(json_obj.GetString(), json_obj.GetStringLength());
+      if (!from_string_(view.data(), view.size(), &value)) {
+        return Status::Invalid("couldn't parse timestamp from ", view);
+      }
+    } else {
+      return JSONTypeError("timestamp", json_obj.GetType());
+    }
+    return builder_->Append(value);
+  }
+
+  std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+  ::arrow::internal::StringConverter<TimestampType> from_string_;
+  std::shared_ptr<TimestampBuilder> builder_;
+};
+
+// ------------------------------------------------------------------------
 // Converter for binary and string arrays
 
 class StringConverter final : public ConcreteConverter<StringConverter> {
@@ -312,7 +367,7 @@ class StringConverter final : public ConcreteConverter<StringConverter> {
 
   std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
 
- protected:
+ private:
   std::shared_ptr<BinaryBuilder> builder_;
 };
 
@@ -349,7 +404,7 @@ class FixedSizeBinaryConverter final
 
   std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
 
- protected:
+ private:
   std::shared_ptr<FixedSizeBinaryBuilder> builder_;
 };
 
@@ -381,7 +436,7 @@ class ListConverter final : public ConcreteConverter<ListConverter> {
 
   std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
 
- protected:
+ private:
   std::shared_ptr<ListBuilder> builder_;
   std::shared_ptr<Converter> child_converter_;
 };
@@ -456,7 +511,7 @@ class StructConverter final : public ConcreteConverter<StructConverter> {
 
   std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
 
- protected:
+ private:
   std::shared_ptr<StructBuilder> builder_;
   std::vector<std::shared_ptr<Converter>> child_converters_;
 };
@@ -481,7 +536,7 @@ Status GetConverter(const std::shared_ptr<DataType>& type,
     SIMPLE_CONVERTER_CASE(Type::DATE32, IntegerConverter<Date32Type>)
     SIMPLE_CONVERTER_CASE(Type::INT64, IntegerConverter<Int64Type>)
     SIMPLE_CONVERTER_CASE(Type::TIME64, IntegerConverter<Int64Type>)
-    SIMPLE_CONVERTER_CASE(Type::TIMESTAMP, IntegerConverter<Int64Type>)
+    SIMPLE_CONVERTER_CASE(Type::TIMESTAMP, TimestampConverter)
     SIMPLE_CONVERTER_CASE(Type::DATE64, IntegerConverter<Date64Type>)
     SIMPLE_CONVERTER_CASE(Type::UINT8, IntegerConverter<UInt8Type>)
     SIMPLE_CONVERTER_CASE(Type::UINT16, IntegerConverter<UInt16Type>)
diff --git a/cpp/src/arrow/json/api.h b/cpp/src/arrow/json/api.h
index 00fbc2e..47b5668 100644
--- a/cpp/src/arrow/json/api.h
+++ b/cpp/src/arrow/json/api.h
@@ -15,10 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_JSON_API_H
-#define ARROW_JSON_API_H
+#pragma once
 
 #include "arrow/json/options.h"
 #include "arrow/json/reader.h"
-
-#endif  // ARROW_JSON_API_H
diff --git a/cpp/src/arrow/json/chunker-test.cc b/cpp/src/arrow/json/chunker-test.cc
index 88fbc31..f2d9cc2 100644
--- a/cpp/src/arrow/json/chunker-test.cc
+++ b/cpp/src/arrow/json/chunker-test.cc
@@ -15,20 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <cstdint>
 #include <memory>
 #include <numeric>
 #include <string>
 
 #include <gtest/gtest.h>
-#include <rapidjson/document.h>
-#include <rapidjson/prettywriter.h>
-#include <rapidjson/reader.h>
 
+#include "arrow/buffer.h"
 #include "arrow/json/chunker.h"
-#include "arrow/json/options.h"
-#include "arrow/testing/gtest_common.h"
-#include "arrow/testing/util.h"
+#include "arrow/json/test-common.h"
+#include "arrow/testing/gtest_util.h"
 #include "arrow/util/string_view.h"
 
 namespace arrow {
@@ -41,89 +37,88 @@ namespace json {
 using util::string_view;
 
 template <typename Lines>
-std::string join(Lines&& lines, std::string delimiter) {
-  std::string joined;
+static std::shared_ptr<Buffer> join(Lines&& lines, std::string delimiter) {
+  std::shared_ptr<Buffer> joined;
+  BufferVector line_buffers;
+  auto delimiter_buffer = std::make_shared<Buffer>(delimiter);
   for (const auto& line : lines) {
-    joined += line + delimiter;
+    line_buffers.push_back(std::make_shared<Buffer>(line));
+    line_buffers.push_back(delimiter_buffer);
   }
+  ABORT_NOT_OK(ConcatenateBuffers(line_buffers, default_memory_pool(), &joined));
   return joined;
 }
 
-std::string PrettyPrint(string_view one_line) {
-  rapidjson::Document document;
-  document.Parse(one_line.data());
-  rapidjson::StringBuffer sb;
-  rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb);
-  document.Accept(writer);
-  return sb.GetString();
+static bool WhitespaceOnly(string_view s) {
+  return s.find_first_not_of(" \t\r\n") == string_view::npos;
 }
 
-bool WhitespaceOnly(string_view s) {
-  return s.find_first_not_of(" \t\r\n") == string_view::npos;
+static bool WhitespaceOnly(const std::shared_ptr<Buffer>& b) {
+  return WhitespaceOnly(string_view(*b));
 }
 
-std::size_t ConsumeWholeObject(string_view* str) {
-  auto fail = [str] {
-    *str = string_view();
+static std::size_t ConsumeWholeObject(std::shared_ptr<Buffer>* buf) {
+  auto str = string_view(**buf);
+  auto fail = [buf] {
+    *buf = nullptr;
     return string_view::npos;
   };
-  if (WhitespaceOnly(*str)) return fail();
-  auto open_brace = str->find_first_not_of(" \t\r\n");
-  if (str->at(open_brace) != '{') return fail();
-  auto close_brace = str->find_first_of("}");
+  if (WhitespaceOnly(str)) return fail();
+  auto open_brace = str.find_first_not_of(" \t\r\n");
+  if (str.at(open_brace) != '{') return fail();
+  auto close_brace = str.find_first_of("}");
   if (close_brace == string_view::npos) return fail();
-  if (str->at(close_brace) != '}') return fail();
   auto length = close_brace + 1;
-  *str = str->substr(length);
+  *buf = SliceBuffer(*buf, length);
   return length;
 }
 
-void AssertWholeObjects(Chunker& chunker, string_view block, int expected_count) {
-  string_view whole;
-  ASSERT_OK(chunker.Process(block, &whole));
+void AssertWholeObjects(Chunker& chunker, const std::shared_ptr<Buffer>& block,
+                        int expected_count) {
+  std::shared_ptr<Buffer> whole, partial;
+  ASSERT_OK(chunker.Process(block, &whole, &partial));
   int count = 0;
-  while (!WhitespaceOnly(whole)) {
+  while (whole && !WhitespaceOnly(whole)) {
     if (ConsumeWholeObject(&whole) == string_view::npos) FAIL();
     ++count;
   }
   ASSERT_EQ(count, expected_count);
 }
 
-void AssertChunking(Chunker& chunker, std::string str, int total_count) {
+void AssertChunking(Chunker& chunker, std::shared_ptr<Buffer> buf, int total_count) {
   // First chunkize whole JSON block
-  AssertWholeObjects(chunker, str, total_count);
+  AssertWholeObjects(chunker, buf, total_count);
 
   // Then chunkize incomplete substrings of the block
-  for (int i = 0; i != total_count; ++i) {
+  for (int i = 0; i < total_count; ++i) {
     // ensure shearing the closing brace off the last object causes it to be chunked out
-    string_view str_view(str);
-    auto last_brace = str_view.find_last_of('}');
-    AssertWholeObjects(chunker, str.substr(0, last_brace), total_count - i - 1);
+    auto last_brace = string_view(*buf).find_last_of('}');
+    AssertWholeObjects(chunker, SliceBuffer(buf, 0, last_brace), total_count - i - 1);
 
     // ensure skipping one object reduces the count by one
-    ASSERT_NE(ConsumeWholeObject(&str_view), string_view::npos);
-    str = str_view.to_string();
-    AssertWholeObjects(chunker, str, total_count - i - 1);
+    ASSERT_NE(ConsumeWholeObject(&buf), string_view::npos);
+    AssertWholeObjects(chunker, buf, total_count - i - 1);
   }
 }
 
-void AssertStraddledChunking(Chunker& chunker, string_view str) {
-  auto first_half = str.substr(0, str.size() / 2).to_string();
-  auto second_half = str.substr(str.size() / 2);
+void AssertStraddledChunking(Chunker& chunker, const std::shared_ptr<Buffer>& buf) {
+  auto first_half = SliceBuffer(buf, 0, buf->size() / 2);
+  auto second_half = SliceBuffer(buf, buf->size() / 2);
   AssertChunking(chunker, first_half, 1);
-  string_view first_whole;
-  ASSERT_OK(chunker.Process(first_half, &first_whole));
-  ASSERT_TRUE(string_view(first_half).starts_with(first_whole));
-  auto partial = string_view(first_half).substr(first_whole.size());
-  string_view completion;
-  ASSERT_OK(chunker.Process(partial, second_half, &completion));
-  ASSERT_TRUE(second_half.starts_with(completion));
-  auto straddling = partial.to_string() + completion.to_string();
-  string_view straddling_view(straddling);
-  auto length = ConsumeWholeObject(&straddling_view);
+  std::shared_ptr<Buffer> first_whole, partial;
+  ASSERT_OK(chunker.Process(first_half, &first_whole, &partial));
+  ASSERT_TRUE(string_view(*first_half).starts_with(string_view(*first_whole)));
+  std::shared_ptr<Buffer> completion, rest;
+  ASSERT_OK(chunker.ProcessWithPartial(partial, second_half, &completion, &rest));
+  ASSERT_TRUE(string_view(*second_half).starts_with(string_view(*completion)));
+  std::shared_ptr<Buffer> straddling;
+  ASSERT_OK(
+      ConcatenateBuffers({partial, completion}, default_memory_pool(), &straddling));
+  auto length = ConsumeWholeObject(&straddling);
   ASSERT_NE(length, string_view::npos);
   ASSERT_NE(length, 0);
-  auto final_whole = second_half.substr(completion.size());
+  auto final_whole = SliceBuffer(second_half, completion->size());
+  ASSERT_EQ(string_view(*final_whole), string_view(*rest));
   length = ConsumeWholeObject(&final_whole);
   ASSERT_NE(length, string_view::npos);
   ASSERT_NE(length, 0);
@@ -142,12 +137,12 @@ class BaseChunkerTest : public ::testing::TestWithParam<bool> {
   std::unique_ptr<Chunker> chunker_;
 };
 
-INSTANTIATE_TEST_CASE_P(ChunkerTest, BaseChunkerTest, ::testing::Values(true));
-
 INSTANTIATE_TEST_CASE_P(NoNewlineChunkerTest, BaseChunkerTest, ::testing::Values(false));
 
+INSTANTIATE_TEST_CASE_P(ChunkerTest, BaseChunkerTest, ::testing::Values(true));
+
 constexpr auto object_count = 3;
-const std::vector<std::string>& lines() {
+static const std::vector<std::string>& lines() {
   static const std::vector<std::string> l = {R"({"0":"ab","1":"c","2":""})",
                                              R"({"0":"def","1":"","2":"gh"})",
                                              R"({"0":"","1":"ij","2":"kl"})"};
@@ -159,8 +154,10 @@ TEST_P(BaseChunkerTest, Basics) {
 }
 
 TEST_P(BaseChunkerTest, Empty) {
-  AssertChunking(*chunker_, "\n", 0);
-  AssertChunking(*chunker_, "\n\n", 0);
+  auto empty = std::make_shared<Buffer>("\n");
+  AssertChunking(*chunker_, empty, 0);
+  empty = std::make_shared<Buffer>("\n\n");
+  AssertChunking(*chunker_, empty, 0);
 }
 
 TEST(ChunkerTest, PrettyPrinted) {
@@ -193,15 +190,18 @@ TEST(ChunkerTest, StraddlingSingleLine) {
 }
 
 TEST_P(BaseChunkerTest, StraddlingEmpty) {
-  auto joined = join(lines(), "\n");
-  auto first = string_view(joined).substr(0, lines()[0].size() + 1);
-  auto rest = string_view(joined).substr(first.size());
-  string_view first_whole;
-  ASSERT_OK(chunker_->Process(first, &first_whole));
-  auto partial = first.substr(first_whole.size());
-  string_view completion;
-  ASSERT_OK(chunker_->Process(partial, rest, &completion));
-  ASSERT_EQ(completion.size(), 0);
+  auto all = join(lines(), "\n");
+
+  auto first = SliceBuffer(all, 0, lines()[0].size() + 1);
+  std::shared_ptr<Buffer> first_whole, partial;
+  ASSERT_OK(chunker_->Process(first, &first_whole, &partial));
+  ASSERT_TRUE(WhitespaceOnly(partial));
+
+  auto others = SliceBuffer(all, first->size());
+  std::shared_ptr<Buffer> completion, rest;
+  ASSERT_OK(chunker_->ProcessWithPartial(partial, others, &completion, &rest));
+  ASSERT_EQ(completion->size(), 0);
+  ASSERT_TRUE(rest->Equals(*others));
 }
 
 }  // namespace json
diff --git a/cpp/src/arrow/json/chunker.cc b/cpp/src/arrow/json/chunker.cc
index 7b992df..c154a2a 100644
--- a/cpp/src/arrow/json/chunker.cc
+++ b/cpp/src/arrow/json/chunker.cc
@@ -18,81 +18,82 @@
 #include "arrow/json/chunker.h"
 
 #include <algorithm>
-#include <cstdint>
-#include <memory>
-#include <string>
 #include <utility>
 #include <vector>
 
-#if defined(ARROW_HAVE_SSE4_2)
-#define RAPIDJSON_SSE42 1
-#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
-#endif
-#if defined(ARROW_HAVE_SSE2)
-#define RAPIDJSON_SSE2 1
-#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
-#endif
-#include <rapidjson/error/en.h>
-#include <rapidjson/reader.h>
+#include "arrow/json/rapidjson-defs.h"
+#include "rapidjson/reader.h"
 
-#include "arrow/status.h"
-#include "arrow/util/logging.h"
+#include "arrow/buffer.h"
+#include "arrow/json/options.h"
 #include "arrow/util/stl.h"
 #include "arrow/util/string_view.h"
 
 namespace arrow {
 namespace json {
 
+namespace rj = arrow::rapidjson;
+
 using internal::make_unique;
 using util::string_view;
 
-Status StraddlingTooLarge() {
+static Status StraddlingTooLarge() {
   return Status::Invalid("straddling object straddles two block boundaries");
 }
 
-std::size_t ConsumeWhitespace(string_view* str) {
+static size_t ConsumeWhitespace(std::shared_ptr<Buffer>* buf) {
 #if defined(ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD)
-  auto nonws_begin =
-      rapidjson::SkipWhitespace_SIMD(str->data(), str->data() + str->size());
-  auto ws_count = nonws_begin - str->data();
-  *str = str->substr(ws_count);
-  return static_cast<std::size_t>(ws_count);
-#undef ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD
+  auto data = reinterpret_cast<const char*>((*buf)->data());
+  auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + (*buf)->size());
+  auto ws_count = nonws_begin - data;
+  *buf = SliceBuffer(*buf, ws_count);
+  return static_cast<size_t>(ws_count);
 #else
-  auto ws_count = str->find_first_not_of(" \t\r\n");
-  *str = str->substr(ws_count);
+  auto ws_count = string_view(**buf).find_first_not_of(" \t\r\n");
+  if (ws_count == string_view::npos) {
+    ws_count = (*buf)->size();
+  }
+  *buf = SliceBuffer(*buf, ws_count);
   return ws_count;
 #endif
 }
 
 class NewlinesStrictlyDelimitChunker : public Chunker {
  public:
-  Status Process(string_view block, string_view* chunked) override {
-    auto last_newline = block.find_last_of("\n\r");
+  Status Process(const std::shared_ptr<Buffer>& block, std::shared_ptr<Buffer>* whole,
+                 std::shared_ptr<Buffer>* partial) override {
+    auto last_newline = string_view(*block).find_last_of("\n\r");
     if (last_newline == string_view::npos) {
       // no newlines in this block, return empty chunk
-      *chunked = string_view();
+      *whole = SliceBuffer(block, 0, 0);
+      *partial = block;
     } else {
-      *chunked = block.substr(0, last_newline + 1);
+      *whole = SliceBuffer(block, 0, last_newline + 1);
+      *partial = SliceBuffer(block, last_newline + 1);
     }
     return Status::OK();
   }
 
-  Status Process(string_view partial, string_view block,
-                 string_view* completion) override {
+  Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial_original,
+                            const std::shared_ptr<Buffer>& block,
+                            std::shared_ptr<Buffer>* completion,
+                            std::shared_ptr<Buffer>* rest) override {
+    auto partial = partial_original;
     ConsumeWhitespace(&partial);
-    if (partial.size() == 0) {
+    if (partial->size() == 0) {
       // if partial is empty, don't bother looking for completion
-      *completion = string_view();
+      *completion = SliceBuffer(block, 0, 0);
+      *rest = block;
       return Status::OK();
     }
-    auto first_newline = block.find_first_of("\n\r");
+    auto first_newline = string_view(*block).find_first_of("\n\r");
     if (first_newline == string_view::npos) {
       // no newlines in this block; straddling object straddles *two* block boundaries.
       // retry with larger buffer
       return StraddlingTooLarge();
     }
-    *completion = block.substr(0, first_newline + 1);
+    *completion = SliceBuffer(block, 0, first_newline + 1);
+    *rest = SliceBuffer(block, first_newline + 1);
     return Status::OK();
   }
 };
@@ -104,7 +105,12 @@ class MultiStringStream {
   using Ch = char;
   explicit MultiStringStream(std::vector<string_view> strings)
       : strings_(std::move(strings)) {
-    std::remove(strings_.begin(), strings_.end(), string_view(""));
+    std::reverse(strings_.begin(), strings_.end());
+  }
+  explicit MultiStringStream(const BufferVector& buffers) : strings_(buffers.size()) {
+    for (size_t i = 0; i < buffers.size(); ++i) {
+      strings_[i] = string_view(*buffers[i]);
+    }
     std::reverse(strings_.begin(), strings_.end());
   }
   char Peek() const {
@@ -122,35 +128,35 @@ class MultiStringStream {
     ++index_;
     return taken;
   }
-  std::size_t Tell() { return index_; }
+  size_t Tell() { return index_; }
   void Put(char) { ARROW_LOG(FATAL) << "not implemented"; }
   void Flush() { ARROW_LOG(FATAL) << "not implemented"; }
   char* PutBegin() {
     ARROW_LOG(FATAL) << "not implemented";
     return nullptr;
   }
-  std::size_t PutEnd(char*) {
+  size_t PutEnd(char*) {
     ARROW_LOG(FATAL) << "not implemented";
     return 0;
   }
 
  private:
-  std::size_t index_ = 0;
+  size_t index_ = 0;
   std::vector<string_view> strings_;
 };
 
 template <typename Stream>
-std::size_t ConsumeWholeObject(Stream&& stream) {
-  static constexpr unsigned parse_flags = rapidjson::kParseIterativeFlag |
-                                          rapidjson::kParseStopWhenDoneFlag |
-                                          rapidjson::kParseNumbersAsStringsFlag;
-  rapidjson::BaseReaderHandler<rapidjson::UTF8<>> handler;
-  rapidjson::Reader reader;
+static size_t ConsumeWholeObject(Stream&& stream) {
+  static constexpr unsigned parse_flags = rj::kParseIterativeFlag |
+                                          rj::kParseStopWhenDoneFlag |
+                                          rj::kParseNumbersAsStringsFlag;
+  rj::BaseReaderHandler<rj::UTF8<>> handler;
+  rj::Reader reader;
   // parse a single JSON object
   switch (reader.Parse<parse_flags>(stream, handler).Code()) {
-    case rapidjson::kParseErrorNone:
+    case rj::kParseErrorNone:
       return stream.Tell();
-    case rapidjson::kParseErrorDocumentEmpty:
+    case rj::kParseErrorDocumentEmpty:
       return 0;
     default:
       // rapidjson emitted an error, the most recent object was partial
@@ -160,37 +166,44 @@ std::size_t ConsumeWholeObject(Stream&& stream) {
 
 class ParsingChunker : public Chunker {
  public:
-  Status Process(string_view block, string_view* chunked) override {
-    if (block.size() == 0) {
-      *chunked = string_view();
+  Status Process(const std::shared_ptr<Buffer>& block, std::shared_ptr<Buffer>* whole,
+                 std::shared_ptr<Buffer>* partial) override {
+    if (block->size() == 0) {
+      *whole = SliceBuffer(block, 0, 0);
+      *partial = block;
       return Status::OK();
     }
-    std::size_t total_length = 0;
-    for (auto consumed = block;; consumed = block.substr(total_length)) {
-      using rapidjson::MemoryStream;
-      MemoryStream ms(consumed.data(), consumed.size());
-      using InputStream = rapidjson::EncodedInputStream<rapidjson::UTF8<>, MemoryStream>;
+    size_t total_length = 0;
+    for (auto consumed = block;; consumed = SliceBuffer(block, total_length)) {
+      rj::MemoryStream ms(reinterpret_cast<const char*>(consumed->data()),
+                          consumed->size());
+      using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>;
       auto length = ConsumeWholeObject(InputStream(ms));
       if (length == string_view::npos || length == 0) {
         // found incomplete object or consumed is empty
         break;
       }
-      if (length > consumed.size()) {
-        total_length += consumed.size();
+      if (static_cast<int64_t>(length) > consumed->size()) {
+        total_length += consumed->size();
         break;
       }
       total_length += length;
     }
-    *chunked = block.substr(0, total_length);
+    *whole = SliceBuffer(block, 0, total_length);
+    *partial = SliceBuffer(block, total_length);
     return Status::OK();
   }
 
-  Status Process(string_view partial, string_view block,
-                 string_view* completion) override {
+  Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial_original,
+                            const std::shared_ptr<Buffer>& block,
+                            std::shared_ptr<Buffer>* completion,
+                            std::shared_ptr<Buffer>* rest) override {
+    auto partial = partial_original;
     ConsumeWhitespace(&partial);
-    if (partial.size() == 0) {
+    if (partial->size() == 0) {
       // if partial is empty, don't bother looking for completion
-      *completion = string_view();
+      *completion = SliceBuffer(block, 0, 0);
+      *rest = block;
       return Status::OK();
     }
     auto length = ConsumeWholeObject(MultiStringStream({partial, block}));
@@ -199,12 +212,14 @@ class ParsingChunker : public Chunker {
       // retry with larger buffer
       return StraddlingTooLarge();
     }
-    *completion = block.substr(0, length - partial.size());
+    auto completion_length = length - partial->size();
+    *completion = SliceBuffer(block, 0, completion_length);
+    *rest = SliceBuffer(block, completion_length);
     return Status::OK();
   }
 };
 
-std::unique_ptr<Chunker> Chunker::Make(ParseOptions options) {
+std::unique_ptr<Chunker> Chunker::Make(const ParseOptions& options) {
   if (!options.newlines_in_values) {
     return make_unique<NewlinesStrictlyDelimitChunker>();
   }
diff --git a/cpp/src/arrow/json/chunker.h b/cpp/src/arrow/json/chunker.h
index a74abf2..0f94d81 100644
--- a/cpp/src/arrow/json/chunker.h
+++ b/cpp/src/arrow/json/chunker.h
@@ -15,21 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_JSON_CHUNKER_H
-#define ARROW_JSON_CHUNKER_H
+#pragma once
 
 #include <memory>
 
-#include "arrow/json/options.h"
 #include "arrow/status.h"
 #include "arrow/util/macros.h"
-#include "arrow/util/sse-util.h"
-#include "arrow/util/string_view.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
+
+class Buffer;
+
 namespace json {
 
+struct ParseOptions;
+
 /// \class Chunker
 /// \brief A reusable block-based chunker for JSON data
 ///
@@ -41,17 +42,23 @@ class ARROW_EXPORT Chunker {
 
   /// \brief Carve up a chunk in a block of data to contain only whole objects
   /// \param[in] block json data to be chunked
-  /// \param[out] chunked subrange of block containing whole json objects
-  virtual Status Process(util::string_view block, util::string_view* chunked) = 0;
+  /// \param[out] whole subrange of block containing whole json objects
+  /// \param[out] partial subrange of block a partial json object
+  virtual Status Process(const std::shared_ptr<Buffer>& block,
+                         std::shared_ptr<Buffer>* whole,
+                         std::shared_ptr<Buffer>* partial) = 0;
 
   /// \brief Carve the completion of a partial object out of a block
   /// \param[in] partial incomplete json object
   /// \param[in] block json data
-  /// \param[out] completion subrange of block contining the completion of partial
-  virtual Status Process(util::string_view partial, util::string_view block,
-                         util::string_view* completion) = 0;
+  /// \param[out] completion subrange of block containing the completion of partial
+  /// \param[out] rest subrange of block containing what completion does not cover
+  virtual Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial,
+                                    const std::shared_ptr<Buffer>& block,
+                                    std::shared_ptr<Buffer>* completion,
+                                    std::shared_ptr<Buffer>* rest) = 0;
 
-  static std::unique_ptr<Chunker> Make(ParseOptions options);
+  static std::unique_ptr<Chunker> Make(const ParseOptions& options);
 
  protected:
   Chunker() = default;
@@ -60,5 +67,3 @@ class ARROW_EXPORT Chunker {
 
 }  // namespace json
 }  // namespace arrow
-
-#endif  // ARROW_JSON_CHUNKER_H
diff --git a/cpp/src/arrow/json/options.h b/cpp/src/arrow/json/options.h
index 0ec1b29..8d27faa 100644
--- a/cpp/src/arrow/json/options.h
+++ b/cpp/src/arrow/json/options.h
@@ -15,20 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_JSON_OPTIONS_H
-#define ARROW_JSON_OPTIONS_H
+#pragma once
 
 #include <cstdint>
 #include <memory>
-#include <string>
-#include <unordered_map>
 
-#include "arrow/type.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
 class DataType;
+class Schema;
 
 namespace json {
 
@@ -64,5 +61,3 @@ struct ARROW_EXPORT ReadOptions {
 
 }  // namespace json
 }  // namespace arrow
-
-#endif  // ARROW_JSON_OPTIONS_H
diff --git a/cpp/src/arrow/json/parser-benchmark.cc b/cpp/src/arrow/json/parser-benchmark.cc
index 82f0b33..bdf83af 100644
--- a/cpp/src/arrow/json/parser-benchmark.cc
+++ b/cpp/src/arrow/json/parser-benchmark.cc
@@ -17,9 +17,9 @@
 
 #include "benchmark/benchmark.h"
 
-#include <iostream>
 #include <string>
 
+#include "arrow/json/chunker.h"
 #include "arrow/json/options.h"
 #include "arrow/json/parser.h"
 #include "arrow/json/test-common.h"
@@ -28,22 +28,72 @@
 namespace arrow {
 namespace json {
 
+static void BenchmarkJSONChunking(benchmark::State& state,  // NOLINT non-const reference
+                                  const std::shared_ptr<Buffer>& json,
+                                  ParseOptions options) {
+  auto chunker = Chunker::Make(options);
+  for (auto _ : state) {
+    std::shared_ptr<Buffer> chunked, partial;
+    ABORT_NOT_OK(chunker->Process(json, &chunked, &partial));
+  }
+  state.SetBytesProcessed(state.iterations() * json->size());
+}
+
+static void BM_ChunkJSONPrettyPrinted(
+    benchmark::State& state) {  // NOLINT non-const reference
+  const int32_t num_rows = 5000;
+  auto options = ParseOptions::Defaults();
+  options.newlines_in_values = true;
+  options.explicit_schema = schema({field("int", int32()), field("str", utf8())});
+  std::default_random_engine engine;
+  std::string json;
+  for (int i = 0; i < num_rows; ++i) {
+    StringBuffer sb;
+    Writer writer(sb);
+    ABORT_NOT_OK(Generate(options.explicit_schema, engine, &writer));
+    json += PrettyPrint(sb.GetString());
+    json += "\n";
+  }
+  BenchmarkJSONChunking(state, std::make_shared<Buffer>(json), options);
+}
+
+BENCHMARK(BM_ChunkJSONPrettyPrinted)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+
+static void BM_ChunkJSONLineDelimited(
+    benchmark::State& state) {  // NOLINT non-const reference
+  const int32_t num_rows = 5000;
+  auto options = ParseOptions::Defaults();
+  options.newlines_in_values = false;
+  options.explicit_schema = schema({field("int", int32()), field("str", utf8())});
+  std::default_random_engine engine;
+  std::string json;
+  for (int i = 0; i < num_rows; ++i) {
+    StringBuffer sb;
+    Writer writer(sb);
+    ABORT_NOT_OK(Generate(options.explicit_schema, engine, &writer));
+    json += sb.GetString();
+    json += "\n";
+  }
+  BenchmarkJSONChunking(state, std::make_shared<Buffer>(json), options);
+}
+
+BENCHMARK(BM_ChunkJSONLineDelimited)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+
 static void BenchmarkJSONParsing(benchmark::State& state,  // NOLINT non-const reference
-                                 const std::string& json, int32_t num_rows,
+                                 const std::shared_ptr<Buffer>& json, int32_t num_rows,
                                  ParseOptions options) {
   for (auto _ : state) {
-    std::shared_ptr<Buffer> src;
-    ABORT_NOT_OK(MakeBuffer(json, &src));
-    BlockParser parser(options, src);
-    ABORT_NOT_OK(parser.Parse(src));
-    if (parser.num_rows() != num_rows) {
+    std::unique_ptr<BlockParser> parser;
+    ABORT_NOT_OK(BlockParser::Make(options, &parser));
+    ABORT_NOT_OK(parser->Parse(json));
+    if (parser->num_rows() != num_rows) {
       std::cerr << "Parsing incomplete\n";
       std::abort();
     }
     std::shared_ptr<Array> parsed;
-    ABORT_NOT_OK(parser.Finish(&parsed));
+    ABORT_NOT_OK(parser->Finish(&parsed));
   }
-  state.SetBytesProcessed(state.iterations() * json.size());
+  state.SetBytesProcessed(state.iterations() * json->size());
 }
 
 static void BM_ParseJSONBlockWithSchema(
@@ -52,16 +102,16 @@ static void BM_ParseJSONBlockWithSchema(
   auto options = ParseOptions::Defaults();
   options.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
   options.explicit_schema = schema({field("int", int32()), field("str", utf8())});
-  std::mt19937_64 engine;
+  std::default_random_engine engine;
   std::string json;
-  for (int i = 0; i != num_rows; ++i) {
+  for (int i = 0; i < num_rows; ++i) {
     StringBuffer sb;
     Writer writer(sb);
     ABORT_NOT_OK(Generate(options.explicit_schema, engine, &writer));
     json += sb.GetString();
     json += "\n";
   }
-  BenchmarkJSONParsing(state, json, num_rows, options);
+  BenchmarkJSONParsing(state, std::make_shared<Buffer>(json), num_rows, options);
 }
 
 BENCHMARK(BM_ParseJSONBlockWithSchema)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
diff --git a/cpp/src/arrow/json/parser-test.cc b/cpp/src/arrow/json/parser-test.cc
index c3af4a7..5d14024 100644
--- a/cpp/src/arrow/json/parser-test.cc
+++ b/cpp/src/arrow/json/parser-test.cc
@@ -15,33 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <cstdint>
-#include <iomanip>
 #include <string>
 #include <utility>
 #include <vector>
 
 #include <gtest/gtest.h>
-#include <rapidjson/error/en.h>
-#include <rapidjson/reader.h>
 
-#include "arrow/ipc/json-simple.h"
 #include "arrow/json/options.h"
 #include "arrow/json/parser.h"
-#include "arrow/json/reader.h"
 #include "arrow/json/test-common.h"
 #include "arrow/status.h"
-#include "arrow/testing/util.h"
-#include "arrow/util/logging.h"
+#include "arrow/testing/gtest_util.h"
 #include "arrow/util/string_view.h"
 
 namespace arrow {
+namespace json {
 
 using util::string_view;
 
-namespace json {
-
-std::string scalars_only_src() {
+static std::string scalars_only_src() {
   return R"(
     { "hello": 3.5, "world": false, "yo": "thing" }
     { "hello": 3.2, "world": null }
@@ -50,7 +42,7 @@ std::string scalars_only_src() {
   )";
 }
 
-std::string nested_src() {
+static std::string nested_src() {
   return R"(
     { "hello": 3.5, "world": false, "yo": "thing", "arr": [1, 2, 3], "nuf": {} }
     { "hello": 3.2, "world": null, "arr": [2], "nuf": null }
@@ -59,9 +51,10 @@ std::string nested_src() {
   )";
 }
 
-void AssertRawStructArraysEqual(const StructArray& expected, const StructArray& actual);
+void AssertUnconvertedStructArraysEqual(const StructArray& expected,
+                                        const StructArray& actual);
 
-void AssertRawArraysEqual(const Array& expected, const Array& actual) {
+void AssertUnconvertedArraysEqual(const Array& expected, const Array& actual) {
   switch (actual.type_id()) {
     case Type::BOOL:
     case Type::NA:
@@ -81,43 +74,38 @@ void AssertRawArraysEqual(const Array& expected, const Array& actual) {
       AssertBufferEqual(*expected_offsets, *actual_offsets);
       auto expected_values = static_cast<const ListArray&>(expected).values();
       auto actual_values = static_cast<const ListArray&>(actual).values();
-      return AssertRawArraysEqual(*expected_values, *actual_values);
+      return AssertUnconvertedArraysEqual(*expected_values, *actual_values);
     }
     case Type::STRUCT:
       ASSERT_EQ(expected.type_id(), Type::STRUCT);
-      return AssertRawStructArraysEqual(static_cast<const StructArray&>(expected),
-                                        static_cast<const StructArray&>(actual));
+      return AssertUnconvertedStructArraysEqual(static_cast<const StructArray&>(expected),
+                                                static_cast<const StructArray&>(actual));
     default:
       FAIL();
   }
 }
 
-void AssertRawStructArraysEqual(const StructArray& expected, const StructArray& actual) {
+void AssertUnconvertedStructArraysEqual(const StructArray& expected,
+                                        const StructArray& actual) {
   ASSERT_EQ(expected.num_fields(), actual.num_fields());
-  for (int i = 0; i != expected.num_fields(); ++i) {
+  for (int i = 0; i < expected.num_fields(); ++i) {
     auto expected_name = expected.type()->child(i)->name();
     auto actual_name = actual.type()->child(i)->name();
     ASSERT_EQ(expected_name, actual_name);
-    AssertRawArraysEqual(*expected.field(i), *actual.field(i));
+    AssertUnconvertedArraysEqual(*expected.field(i), *actual.field(i));
   }
 }
 
 void AssertParseColumns(ParseOptions options, string_view src_str,
                         const std::vector<std::shared_ptr<Field>>& fields,
                         const std::vector<std::string>& columns_json) {
-  std::shared_ptr<Buffer> src;
-  ASSERT_OK(MakeBuffer(src_str, &src));
-  BlockParser parser(options, src);
-  ASSERT_OK(parser.Parse(src));
   std::shared_ptr<Array> parsed;
-  ASSERT_OK(parser.Finish(&parsed));
+  ASSERT_OK(ParseFromString(options, src_str, &parsed));
   auto struct_array = std::static_pointer_cast<StructArray>(parsed);
-  for (size_t i = 0; i != fields.size(); ++i) {
-    // std::shared_ptr<Array> column_expected;
-    // ASSERT_OK(ArrayFromJSON(fields[i]->type(), columns_json[i], &column_expected));
+  for (size_t i = 0; i < fields.size(); ++i) {
     auto column_expected = ArrayFromJSON(fields[i]->type(), columns_json[i]);
     auto column = struct_array->GetFieldByName(fields[i]->name());
-    AssertRawArraysEqual(*column_expected, *column);
+    AssertUnconvertedArraysEqual(*column_expected, *column);
   }
 }
 
@@ -160,10 +148,8 @@ TEST(BlockParserWithSchema, FailOnInconvertible) {
   auto options = ParseOptions::Defaults();
   options.explicit_schema = schema({field("a", int32())});
   options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
-  std::shared_ptr<Buffer> src;
-  ASSERT_OK(MakeBuffer("{\"a\":0}\n{\"a\":true}", &src));
-  BlockParser parser(options, src);
-  ASSERT_RAISES(Invalid, parser.Parse(src));
+  std::shared_ptr<Array> parsed;
+  ASSERT_RAISES(Invalid, ParseFromString(options, "{\"a\":0}\n{\"a\":true}", &parsed));
 }
 
 TEST(BlockParserWithSchema, Nested) {
@@ -183,10 +169,8 @@ TEST(BlockParserWithSchema, FailOnIncompleteJson) {
   auto options = ParseOptions::Defaults();
   options.explicit_schema = schema({field("a", int32())});
   options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
-  std::shared_ptr<Buffer> src;
-  ASSERT_OK(MakeBuffer("{\"a\":0, \"b\"", &src));
-  BlockParser parser(options, src);
-  ASSERT_RAISES(Invalid, parser.Parse(src));
+  std::shared_ptr<Array> parsed;
+  ASSERT_RAISES(Invalid, ParseFromString(options, "{\"a\":0, \"b\"", &parsed));
 }
 
 TEST(BlockParser, Basics) {
@@ -210,40 +194,5 @@ TEST(BlockParser, Nested) {
                       R"([{"ps":null}, null, {"ps":"78"}, {"ps":"90"}])"});
 }
 
-void AssertParseOne(ParseOptions options, string_view src_str,
-                    const std::vector<std::shared_ptr<Field>>& fields,
-                    const std::vector<std::string>& columns_json) {
-  std::shared_ptr<Buffer> src;
-  ASSERT_OK(MakeBuffer(src_str, &src));
-  std::shared_ptr<RecordBatch> parsed;
-  ASSERT_OK(ParseOne(options, src, &parsed));
-  for (size_t i = 0; i != fields.size(); ++i) {
-    auto column_expected = ArrayFromJSON(fields[i]->type(), columns_json[i]);
-    auto column = parsed->GetColumnByName(fields[i]->name());
-    AssertArraysEqual(*column_expected, *column);
-  }
-}
-
-TEST(ParseOne, Basics) {
-  auto options = ParseOptions::Defaults();
-  options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
-  AssertParseOne(
-      options, scalars_only_src(),
-      {field("hello", float64()), field("world", boolean()), field("yo", utf8())},
-      {"[3.5, 3.2, 3.4, 0.0]", "[false, null, null, true]",
-       "[\"thing\", null, \"\xe5\xbf\x8d\", null]"});
-}
-
-TEST(ParseOne, Nested) {
-  auto options = ParseOptions::Defaults();
-  options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
-  AssertParseOne(
-      options, nested_src(),
-      {field("yo", utf8()), field("arr", list(int64())),
-       field("nuf", struct_({field("ps", int64())}))},
-      {"[\"thing\", null, \"\xe5\xbf\x8d\", null]", R"([[1, 2, 3], [2], [], null])",
-       R"([{"ps":null}, null, {"ps":78}, {"ps":90}])"});
-}
-
 }  // namespace json
 }  // namespace arrow
diff --git a/cpp/src/arrow/json/parser.cc b/cpp/src/arrow/json/parser.cc
index 99b8911..3fbd1d7 100644
--- a/cpp/src/arrow/json/parser.cc
+++ b/cpp/src/arrow/json/parser.cc
@@ -17,95 +17,109 @@
 
 #include "arrow/json/parser.h"
 
-#include <algorithm>
-#include <cstdio>
 #include <limits>
-#include <sstream>
 #include <tuple>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
-#include <rapidjson/error/en.h>
-#include <rapidjson/reader.h>
+#include "arrow/json/rapidjson-defs.h"
+#include "rapidjson/error/en.h"
+#include "rapidjson/reader.h"
 
 #include "arrow/array.h"
 #include "arrow/buffer-builder.h"
 #include "arrow/builder.h"
-#include "arrow/csv/converter.h"
 #include "arrow/memory_pool.h"
-#include "arrow/record_batch.h"
-#include "arrow/status.h"
 #include "arrow/type.h"
-#include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/stl.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/trie.h"
 #include "arrow/visitor_inline.h"
 
 namespace arrow {
 namespace json {
 
+namespace rj = arrow::rapidjson;
+
 using internal::BitsetStack;
 using internal::checked_cast;
+using internal::make_unique;
 using util::string_view;
 
 template <typename... T>
-Status ParseError(T&&... t) {
+static Status ParseError(T&&... t) {
   return Status::Invalid("JSON parse error: ", std::forward<T>(t)...);
 }
 
-Status KindChangeError(Kind::type from, Kind::type to) {
-  auto from_name = Tag(from)->value(0);
-  auto to_name = Tag(to)->value(0);
-  return ParseError("A column changed from ", from_name, " to ", to_name);
+static Status KindChangeError(Kind::type from, Kind::type to) {
+  return ParseError("A column changed from ", Kind::Name(from), " to ", Kind::Name(to));
 }
 
-/// Similar to StringBuilder, but appends bytes into the provided buffer without
-/// resizing. This builder does not support appending nulls.
-class UnsafeStringBuilder {
- public:
-  UnsafeStringBuilder(MemoryPool* pool, const std::shared_ptr<Buffer>& buffer)
-      : offsets_builder_(pool), values_buffer_(buffer) {
-    DCHECK_NE(values_buffer_, nullptr);
-  }
-
-  Status Append(string_view str) {
-    DCHECK_LE(static_cast<int64_t>(str.size()), capacity() - values_end_);
-    RETURN_NOT_OK(AppendNextOffset());
-    std::memcpy(values_buffer_->mutable_data() + values_end_, str.data(), str.size());
-    length_ += 1;
-    values_end_ += str.size();
-    return Status::OK();
-  }
-
-  // Builder may not be reused after Finish()
-  Status Finish(std::shared_ptr<Array>* out, int64_t* values_length = nullptr) && {
-    RETURN_NOT_OK(AppendNextOffset());
-    if (values_length) {
-      *values_length = values_end_;
-    }
-    std::shared_ptr<Buffer> offsets;
-    RETURN_NOT_OK(offsets_builder_.Finish(&offsets));
-    auto data = ArrayData::Make(utf8(), length_, {nullptr, offsets, values_buffer_}, 0);
-    *out = MakeArray(data);
-    return Status::OK();
-  }
+const std::string& Kind::Name(Kind::type kind) {
+  static const std::string names[] = {"null",   "boolean", "number",
+                                      "string", "array",   "object"};
 
-  int64_t length() { return length_; }
-
-  int64_t capacity() { return values_buffer_->size(); }
+  return names[kind];
+}
 
-  int64_t remaining_capacity() { return values_buffer_->size() - values_end_; }
+const std::shared_ptr<const KeyValueMetadata>& Kind::Tag(Kind::type kind) {
+  static const std::shared_ptr<const KeyValueMetadata> tags[] = {
+      key_value_metadata({{"json_kind", Kind::Name(Kind::kNull)}}),
+      key_value_metadata({{"json_kind", Kind::Name(Kind::kBoolean)}}),
+      key_value_metadata({{"json_kind", Kind::Name(Kind::kNumber)}}),
+      key_value_metadata({{"json_kind", Kind::Name(Kind::kString)}}),
+      key_value_metadata({{"json_kind", Kind::Name(Kind::kArray)}}),
+      key_value_metadata({{"json_kind", Kind::Name(Kind::kObject)}}),
+  };
+  return tags[kind];
+}
 
- private:
-  Status AppendNextOffset() {
-    return offsets_builder_.Append(static_cast<int32_t>(values_end_));
+static internal::Trie MakeFromTagTrie() {
+  internal::TrieBuilder builder;
+  for (auto kind : {Kind::kNull, Kind::kBoolean, Kind::kNumber, Kind::kString,
+                    Kind::kArray, Kind::kObject}) {
+    DCHECK_OK(builder.Append(Kind::Name(kind)));
   }
+  auto name_to_kind = builder.Finish();
+  DCHECK_OK(name_to_kind.Validate());
+  return name_to_kind;
+}
 
-  int64_t length_ = 0;
-  int64_t values_end_ = 0;
-  TypedBufferBuilder<int32_t> offsets_builder_;
-  std::shared_ptr<Buffer> values_buffer_;
-};
+Kind::type Kind::FromTag(const std::shared_ptr<const KeyValueMetadata>& tag) {
+  static internal::Trie name_to_kind = MakeFromTagTrie();
+  DCHECK_NE(tag->FindKey("json_kind"), -1);
+  util::string_view name = tag->value(tag->FindKey("json_kind"));
+  DCHECK_NE(name_to_kind.Find(name), -1);
+  return static_cast<Kind::type>(name_to_kind.Find(name));
+}
+
+Status Kind::ForType(const DataType& type, Kind::type* kind) {
+  struct {
+    Status Visit(const NullType&) { return SetKind(Kind::kNull); }
+    Status Visit(const BooleanType&) { return SetKind(Kind::kBoolean); }
+    Status Visit(const Number&) { return SetKind(Kind::kNumber); }
+    Status Visit(const TimeType&) { return SetKind(Kind::kNumber); }
+    Status Visit(const DateType&) { return SetKind(Kind::kNumber); }
+    Status Visit(const BinaryType&) { return SetKind(Kind::kString); }
+    Status Visit(const FixedSizeBinaryType&) { return SetKind(Kind::kString); }
+    Status Visit(const DictionaryType& dict_type) {
+      return Kind::ForType(*dict_type.dictionary()->type(), kind_);
+    }
+    Status Visit(const ListType&) { return SetKind(Kind::kArray); }
+    Status Visit(const StructType&) { return SetKind(Kind::kObject); }
+    Status Visit(const DataType& not_impl) {
+      return Status::NotImplemented("JSON parsing of ", not_impl);
+    }
+    Status SetKind(Kind::type kind) {
+      *kind_ = kind;
+      return Status::OK();
+    }
+    Kind::type* kind_;
+  } visitor = {kind};
+  return VisitTypeInline(type, &visitor);
+}
 
 /// \brief ArrayBuilder for parsed but unconverted arrays
 template <Kind::type>
@@ -202,8 +216,9 @@ class ScalarBuilder {
   explicit ScalarBuilder(MemoryPool* pool)
       : data_builder_(pool), null_bitmap_builder_(pool) {}
 
-  Status Append(int32_t index) {
+  Status Append(int32_t index, int32_t value_length) {
     RETURN_NOT_OK(data_builder_.Append(index));
+    values_length_ += value_length;
     return null_bitmap_builder_.Append(true);
   }
 
@@ -229,9 +244,10 @@ class ScalarBuilder {
 
   int64_t length() { return null_bitmap_builder_.length(); }
 
-  // TODO(bkietz) track total length of bytes for later simpler allocation
+  int32_t values_length() { return values_length_; }
 
  private:
+  int32_t values_length_;
   TypedBufferBuilder<int32_t> data_builder_;
   TypedBufferBuilder<bool> null_bitmap_builder_;
 };
@@ -280,8 +296,8 @@ class RawArrayBuilder<Kind::kArray> {
     RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
     std::shared_ptr<Array> values;
     RETURN_NOT_OK(handler.Finish(value_builder_, &values));
-    auto type = list(
-        field("item", values->type(), value_builder_.nullable, Tag(value_builder_.kind)));
+    auto type = list(field("item", values->type(), value_builder_.nullable,
+                           Kind::Tag(value_builder_.kind)));
     *out = MakeArray(ArrayData::Make(type, size, {null_bitmap, offsets}, {values->data()},
                                      null_count));
     return Status::OK();
@@ -346,12 +362,12 @@ class RawArrayBuilder<Kind::kObject> {
 
     std::vector<std::shared_ptr<Field>> fields(num_fields());
     std::vector<std::shared_ptr<ArrayData>> child_data(num_fields());
-    for (int i = 0; i != num_fields(); ++i) {
+    for (int i = 0; i < num_fields(); ++i) {
       std::shared_ptr<Array> values;
       RETURN_NOT_OK(handler.Finish(field_builders_[i], &values));
       child_data[i] = values->data();
       fields[i] = field(field_names[i].to_string(), values->type(),
-                        field_builders_[i].nullable, Tag(field_builders_[i].kind));
+                        field_builders_[i].nullable, Kind::Tag(field_builders_[i].kind));
     }
 
     *out = MakeArray(ArrayData::Make(struct_(std::move(fields)), size, {null_bitmap},
@@ -367,11 +383,11 @@ class RawArrayBuilder<Kind::kObject> {
   TypedBufferBuilder<bool> null_bitmap_builder_;
 };
 
-/// Three implementations are provided for BlockParser::Impl, one for each
+/// Three implementations are provided for BlockParser, one for each
 /// UnexpectedFieldBehavior. However most of the logic is identical in each
 /// case, so the majority of the implementation is in this base class
-class HandlerBase : public BlockParser::Impl,
-                    public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, HandlerBase> {
+class HandlerBase : public BlockParser,
+                    public rj::BaseReaderHandler<rj::UTF8<>, HandlerBase> {
  public:
   /// Retrieve a pointer to a builder from a BuilderPtr
   template <Kind::type kind>
@@ -384,9 +400,9 @@ class HandlerBase : public BlockParser::Impl,
   /// Accessor for a stored error Status
   Status Error() { return status_; }
 
-  /// \defgroup rapidjson-handler-interface functions expected by rapidjson::Reader
+  /// \defgroup rapidjson-handler-interface functions expected by rj::Reader
   ///
-  /// bool Key(const char* data, rapidjson::SizeType size, ...) is omitted since
+  /// bool Key(const char* data, rj::SizeType size, ...) is omitted since
   /// the behavior varies greatly between UnexpectedFieldBehaviors
   ///
   /// @{
@@ -400,12 +416,12 @@ class HandlerBase : public BlockParser::Impl,
     return status_.ok();
   }
 
-  bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+  bool RawNumber(const char* data, rj::SizeType size, ...) {
     status_ = AppendScalar<Kind::kNumber>(string_view(data, size));
     return status_.ok();
   }
 
-  bool String(const char* data, rapidjson::SizeType size, ...) {
+  bool String(const char* data, rj::SizeType size, ...) {
     status_ = AppendScalar<Kind::kString>(string_view(data, size));
     return status_.ok();
   }
@@ -425,22 +441,19 @@ class HandlerBase : public BlockParser::Impl,
     return status_.ok();
   }
 
-  bool EndArray(rapidjson::SizeType size) {
+  bool EndArray(rj::SizeType size) {
     status_ = EndArrayImpl(size);
     return status_.ok();
   }
   /// @}
 
   /// \brief Set up builders using an expected Schema
-  Status SetSchema(const Schema& s) {
-    DCHECK_EQ(arena<Kind::kObject>().size(), 1);
-    for (const auto& f : s.fields()) {
-      BuilderPtr field_builder;
-      RETURN_NOT_OK(MakeBuilder(*f->type(), 0, &field_builder));
-      field_builder.nullable = f->nullable();
-      Cast<Kind::kObject>(builder_)->AddField(f->name(), field_builder);
+  Status Initialize(const std::shared_ptr<Schema>& s) {
+    auto type = struct_({});
+    if (s) {
+      type = struct_(s->fields());
     }
-    return Status::OK();
+    return MakeBuilder(*type, 0, &builder_);
   }
 
   Status Finish(BuilderPtr builder, std::shared_ptr<Array>* out) {
@@ -466,59 +479,67 @@ class HandlerBase : public BlockParser::Impl,
   }
 
   Status Finish(std::shared_ptr<Array>* parsed) override {
-    RETURN_NOT_OK(std::move(scalar_values_builder_).Finish(&scalar_values_));
+    RETURN_NOT_OK(scalar_values_builder_.Finish(&scalar_values_));
     return Finish(builder_, parsed);
   }
 
-  int32_t num_rows() override { return num_rows_; }
+  explicit HandlerBase(MemoryPool* pool)
+      : BlockParser(pool), scalar_values_builder_(pool) {}
 
  protected:
-  HandlerBase(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
-      : pool_(pool),
-        builder_(Kind::kObject, 0, false),
-        scalar_values_builder_(pool, scalar_storage) {
-    arena<Kind::kObject>().emplace_back(pool_);
-  }
-
   /// finish a column of scalar values (string or number)
   Status FinishScalar(ScalarBuilder* builder, std::shared_ptr<Array>* out) {
     std::shared_ptr<Array> indices;
+    // TODO(bkietz) embed builder->values_length() in this output somehow
     RETURN_NOT_OK(builder->Finish(&indices));
     return DictionaryArray::FromArrays(dictionary(int32(), scalar_values_), indices, out);
   }
 
-  template <typename Handler>
-  Status DoParse(Handler& handler, const std::shared_ptr<Buffer>& json) {
-    constexpr auto parse_flags =
-        rapidjson::kParseInsituFlag | rapidjson::kParseIterativeFlag |
-        rapidjson::kParseStopWhenDoneFlag | rapidjson::kParseNumbersAsStringsFlag;
-    auto json_data = reinterpret_cast<char*>(json->mutable_data());
-    rapidjson::GenericInsituStringStream<rapidjson::UTF8<>> ss(json_data);
-    rapidjson::Reader reader;
-
-    for (; num_rows_ != kMaxParserNumRows; ++num_rows_) {
-      // parse a single line of JSON
-      auto ok = reader.Parse<parse_flags>(ss, handler);
+  template <typename Handler, typename Stream>
+  Status DoParse(Handler& handler, Stream&& json) {
+    constexpr auto parse_flags = rj::kParseIterativeFlag | rj::kParseNanAndInfFlag |
+                                 rj::kParseStopWhenDoneFlag |
+                                 rj::kParseNumbersAsStringsFlag;
+
+    rj::Reader reader;
+
+    for (; num_rows_ < kMaxParserNumRows; ++num_rows_) {
+      auto ok = reader.Parse<parse_flags>(json, handler);
       switch (ok.Code()) {
-        case rapidjson::kParseErrorNone:
+        case rj::kParseErrorNone:
           // parse the next object
           continue;
-        case rapidjson::kParseErrorDocumentEmpty: {
+        case rj::kParseErrorDocumentEmpty:
           // parsed all objects, finish
           return Status::OK();
-        }
-        case rapidjson::kParseErrorTermination:
+        case rj::kParseErrorTermination:
           // handler emitted an error
           return handler.Error();
         default:
-          // rapidjson emitted an error
-          return ParseError(rapidjson::GetParseError_En(ok.Code()));
+          // rj emitted an error
+          // FIXME(bkietz) report more error data (at least the byte range of the current
+          // block, and maybe the path to the most recently parsed value?)
+          return ParseError(rj::GetParseError_En(ok.Code()));
       }
     }
     return Status::Invalid("Exceeded maximum rows");
   }
 
-  /// construct a builder of staticallly defined kind in arenas_
+  template <typename Handler>
+  Status DoParse(Handler& handler, const std::shared_ptr<Buffer>& json) {
+    auto remaining_capacity = scalar_values_builder_.value_data_capacity() -
+                              scalar_values_builder_.value_data_length();
+    if (json->size() > remaining_capacity) {
+      auto additional_storage = json->size() - remaining_capacity;
+      RETURN_NOT_OK(scalar_values_builder_.ReserveData(additional_storage));
+    }
+
+    rj::MemoryStream ms(reinterpret_cast<const char*>(json->data()), json->size());
+    using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>;
+    return DoParse(handler, InputStream(ms));
+  }
+
+  /// construct a builder of statically defined kind in arenas_
   template <Kind::type kind>
   Status MakeBuilder(int64_t leading_nulls, BuilderPtr* builder) {
     builder->index = static_cast<uint32_t>(arena<kind>().size());
@@ -531,7 +552,7 @@ class HandlerBase : public BlockParser::Impl,
   /// construct a builder of whatever kind corresponds to a DataType
   Status MakeBuilder(const DataType& t, int64_t leading_nulls, BuilderPtr* builder) {
     Kind::type kind;
-    RETURN_NOT_OK(KindForType(t, &kind));
+    RETURN_NOT_OK(Kind::ForType(t, &kind));
     switch (kind) {
       case Kind::kNull:
         *builder = BuilderPtr(Kind::kNull, static_cast<uint32_t>(leading_nulls), true);
@@ -606,7 +627,7 @@ class HandlerBase : public BlockParser::Impl,
         auto root = builder_;
         auto struct_builder = Cast<Kind::kObject>(builder_);
         RETURN_NOT_OK(struct_builder->AppendNull());
-        for (int i = 0; i != struct_builder->num_fields(); ++i) {
+        for (int i = 0; i < struct_builder->num_fields(); ++i) {
           builder_ = struct_builder->field_builder(i);
           RETURN_NOT_OK(AppendNull());
         }
@@ -632,8 +653,11 @@ class HandlerBase : public BlockParser::Impl,
       return IllegallyChangedTo(kind);
     }
     auto index = static_cast<int32_t>(scalar_values_builder_.length());
-    RETURN_NOT_OK(Cast<kind>(builder_)->Append(index));
-    return scalar_values_builder_.Append(scalar);
+    auto value_length = static_cast<int32_t>(scalar.size());
+    RETURN_NOT_OK(Cast<kind>(builder_)->Append(index, value_length));
+    RETURN_NOT_OK(scalar_values_builder_.Reserve(1));
+    scalar_values_builder_.UnsafeAppend(scalar);
+    return Status::OK();
   }
 
   /// @}
@@ -668,7 +692,7 @@ class HandlerBase : public BlockParser::Impl,
     auto parent = Cast<Kind::kObject>(builder_stack_.back());
 
     auto expected_count = absent_fields_stack_.TopSize();
-    for (field_index_ = 0; field_index_ != expected_count; ++field_index_) {
+    for (field_index_ = 0; field_index_ < expected_count; ++field_index_) {
       if (!absent_fields_stack_[field_index_]) {
         continue;
       }
@@ -694,7 +718,7 @@ class HandlerBase : public BlockParser::Impl,
     return Status::OK();
   }
 
-  Status EndArrayImpl(rapidjson::SizeType size) {
+  Status EndArrayImpl(rj::SizeType size) {
     PopStacks();
     // append to list_builder here
     auto list_builder = Cast<Kind::kArray>(builder_);
@@ -730,7 +754,6 @@ class HandlerBase : public BlockParser::Impl,
   }
 
   Status status_;
-  MemoryPool* pool_;
   std::tuple<std::tuple<>, std::vector<RawArrayBuilder<Kind::kBoolean>>,
              std::vector<RawArrayBuilder<Kind::kNumber>>,
              std::vector<RawArrayBuilder<Kind::kString>>,
@@ -747,9 +770,8 @@ class HandlerBase : public BlockParser::Impl,
   int field_index_;
   // top of this stack == field_index_
   std::vector<int> field_index_stack_;
-  UnsafeStringBuilder scalar_values_builder_;
+  StringBuilder scalar_values_builder_;
   std::shared_ptr<Array> scalar_values_;
-  int32_t num_rows_ = 0;
 };
 
 template <UnexpectedFieldBehavior>
@@ -758,8 +780,7 @@ class Handler;
 template <>
 class Handler<UnexpectedFieldBehavior::Error> : public HandlerBase {
  public:
-  Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
-      : HandlerBase(pool, scalar_storage) {}
+  using HandlerBase::HandlerBase;
 
   Status Parse(const std::shared_ptr<Buffer>& json) override {
     return DoParse(*this, json);
@@ -768,7 +789,7 @@ class Handler<UnexpectedFieldBehavior::Error> : public HandlerBase {
   /// \ingroup rapidjson-handler-interface
   ///
   /// if an unexpected field is encountered, emit a parse error and bail
-  bool Key(const char* key, rapidjson::SizeType len, ...) {
+  bool Key(const char* key, rj::SizeType len, ...) {
     if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len)))) {
       return true;
     }
@@ -780,8 +801,7 @@ class Handler<UnexpectedFieldBehavior::Error> : public HandlerBase {
 template <>
 class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
  public:
-  Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
-      : HandlerBase(pool, scalar_storage) {}
+  using HandlerBase::HandlerBase;
 
   Status Parse(const std::shared_ptr<Buffer>& json) override {
     return DoParse(*this, json);
@@ -801,14 +821,14 @@ class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
     return HandlerBase::Bool(value);
   }
 
-  bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+  bool RawNumber(const char* data, rj::SizeType size, ...) {
     if (Skipping()) {
       return true;
     }
     return HandlerBase::RawNumber(data, size);
   }
 
-  bool String(const char* data, rapidjson::SizeType size, ...) {
+  bool String(const char* data, rj::SizeType size, ...) {
     if (Skipping()) {
       return true;
     }
@@ -826,7 +846,7 @@ class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
   /// \ingroup rapidjson-handler-interface
   ///
   /// if an unexpected field is encountered, skip until its value has been consumed
-  bool Key(const char* key, rapidjson::SizeType len, ...) {
+  bool Key(const char* key, rj::SizeType len, ...) {
     MaybeStopSkipping();
     if (Skipping()) {
       return true;
@@ -854,7 +874,7 @@ class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
     return HandlerBase::StartArray();
   }
 
-  bool EndArray(rapidjson::SizeType size) {
+  bool EndArray(rj::SizeType size) {
     if (Skipping()) {
       return true;
     }
@@ -877,8 +897,7 @@ class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
 template <>
 class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
  public:
-  Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
-      : HandlerBase(pool, scalar_storage) {}
+  using HandlerBase::HandlerBase;
 
   Status Parse(const std::shared_ptr<Buffer>& json) override {
     return DoParse(*this, json);
@@ -891,14 +910,14 @@ class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
     return HandlerBase::Bool(value);
   }
 
-  bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+  bool RawNumber(const char* data, rj::SizeType size, ...) {
     if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kNumber>())) {
       return false;
     }
     return HandlerBase::RawNumber(data, size);
   }
 
-  bool String(const char* data, rapidjson::SizeType size, ...) {
+  bool String(const char* data, rj::SizeType size, ...) {
     if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kString>())) {
       return false;
     }
@@ -918,7 +937,7 @@ class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
   /// the current parent builder. It is added as a NullBuilder with
   /// (parent.length - 1) leading nulls. The next value parsed
   /// will probably trigger promotion of this field from null
-  bool Key(const char* key, rapidjson::SizeType len, ...) {
+  bool Key(const char* key, rj::SizeType len, ...) {
     if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len)))) {
       return true;
     }
@@ -967,41 +986,30 @@ class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
   }
 };
 
-BlockParser::BlockParser(MemoryPool* pool, ParseOptions options,
-                         const std::shared_ptr<Buffer>& scalar_storage)
-    : pool_(pool), options_(options) {
-  DCHECK(options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType ||
-         options_.explicit_schema != nullptr);
-  switch (options_.unexpected_field_behavior) {
+Status BlockParser::Make(MemoryPool* pool, const ParseOptions& options,
+                         std::unique_ptr<BlockParser>* out) {
+  DCHECK(options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType ||
+         options.explicit_schema != nullptr);
+
+  switch (options.unexpected_field_behavior) {
     case UnexpectedFieldBehavior::Ignore: {
-      auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::Ignore>>(
-          pool_, scalar_storage);
-      // FIXME(bkietz) move this to an Initialize()
-      ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
-      impl_ = std::move(handler);
+      *out = make_unique<Handler<UnexpectedFieldBehavior::Ignore>>(pool);
       break;
     }
     case UnexpectedFieldBehavior::Error: {
-      auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::Error>>(
-          pool_, scalar_storage);
-      ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
-      impl_ = std::move(handler);
+      *out = make_unique<Handler<UnexpectedFieldBehavior::Error>>(pool);
       break;
     }
     case UnexpectedFieldBehavior::InferType:
-      auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::InferType>>(
-          pool_, scalar_storage);
-      if (options.explicit_schema) {
-        ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
-      }
-      impl_ = std::move(handler);
+      *out = make_unique<Handler<UnexpectedFieldBehavior::InferType>>(pool);
       break;
   }
+  return static_cast<HandlerBase&>(**out).Initialize(options.explicit_schema);
 }
 
-BlockParser::BlockParser(ParseOptions options,
-                         const std::shared_ptr<Buffer>& scalar_storage)
-    : BlockParser(default_memory_pool(), options, scalar_storage) {}
+Status BlockParser::Make(const ParseOptions& options, std::unique_ptr<BlockParser>* out) {
+  return BlockParser::Make(default_memory_pool(), options, out);
+}
 
 }  // namespace json
 }  // namespace arrow
diff --git a/cpp/src/arrow/json/parser.h b/cpp/src/arrow/json/parser.h
index c001598..22d4af9 100644
--- a/cpp/src/arrow/json/parser.h
+++ b/cpp/src/arrow/json/parser.h
@@ -15,114 +15,71 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_JSON_PARSER_H
-#define ARROW_JSON_PARSER_H
+#pragma once
 
-#include <cstdint>
 #include <memory>
 #include <string>
-#include <unordered_map>
 
-#include "arrow/builder.h"
 #include "arrow/json/options.h"
-#include "arrow/record_batch.h"
 #include "arrow/status.h"
 #include "arrow/util/macros.h"
-#include "arrow/util/string_view.h"
 #include "arrow/util/visibility.h"
-#include "arrow/visitor_inline.h"
 
 namespace arrow {
 
+class Array;
+class Buffer;
 class MemoryPool;
-class RecordBatch;
+class KeyValueMetadata;
+class ResizableBuffer;
 
 namespace json {
 
 struct Kind {
   enum type : uint8_t { kNull, kBoolean, kNumber, kString, kArray, kObject };
-};
 
-inline static const std::shared_ptr<const KeyValueMetadata>& Tag(Kind::type k) {
-  static std::shared_ptr<const KeyValueMetadata> tags[] = {
-      key_value_metadata({{"json_kind", "null"}}),
-      key_value_metadata({{"json_kind", "boolean"}}),
-      key_value_metadata({{"json_kind", "number"}}),
-      key_value_metadata({{"json_kind", "string"}}),
-      key_value_metadata({{"json_kind", "array"}}),
-      key_value_metadata({{"json_kind", "object"}})};
-  return tags[static_cast<uint8_t>(k)];
-}
-
-inline static Status KindForType(const DataType& type, Kind::type* kind) {
-  struct {
-    Status Visit(const NullType&) { return SetKind(Kind::kNull); }
-    Status Visit(const BooleanType&) { return SetKind(Kind::kBoolean); }
-    Status Visit(const Number&) { return SetKind(Kind::kNumber); }
-    // XXX should TimeType & DateType be Kind::kNumber or Kind::kString?
-    Status Visit(const TimeType&) { return SetKind(Kind::kNumber); }
-    Status Visit(const DateType&) { return SetKind(Kind::kNumber); }
-    Status Visit(const BinaryType&) { return SetKind(Kind::kString); }
-    // XXX should Decimal128Type be Kind::kNumber or Kind::kString?
-    Status Visit(const FixedSizeBinaryType&) { return SetKind(Kind::kString); }
-    Status Visit(const DictionaryType& dict_type) {
-      return KindForType(*dict_type.dictionary()->type(), kind_);
-    }
-    Status Visit(const ListType&) { return SetKind(Kind::kArray); }
-    Status Visit(const StructType&) { return SetKind(Kind::kObject); }
-    Status Visit(const DataType& not_impl) {
-      return Status::NotImplemented("JSON parsing of ", not_impl);
-    }
-    Status SetKind(Kind::type kind) {
-      *kind_ = kind;
-      return Status::OK();
-    }
-    Kind::type* kind_;
-  } visitor = {kind};
-  return VisitTypeInline(type, &visitor);
-}
+  static const std::string& Name(Kind::type);
+
+  static const std::shared_ptr<const KeyValueMetadata>& Tag(Kind::type);
+
+  static Kind::type FromTag(const std::shared_ptr<const KeyValueMetadata>& tag);
+
+  static Status ForType(const DataType& type, Kind::type* kind);
+};
 
 constexpr int32_t kMaxParserNumRows = 100000;
 
 /// \class BlockParser
 /// \brief A reusable block-based parser for JSON data
 ///
-/// The parser takes a block of newline delimited JSON data and extracts
-/// keys and value pairs, inserting into provided ArrayBuilders.
-/// Parsed data is own by the
-/// parser, so the original buffer can be discarded after Parse() returns.
+/// The parser takes a block of newline delimited JSON data and extracts Arrays
+/// of unconverted strings which can be fed to a Converter to obtain a usable Array.
 class ARROW_EXPORT BlockParser {
  public:
-  BlockParser(MemoryPool* pool, ParseOptions options,
-              const std::shared_ptr<Buffer>& scalar_storage);
-  BlockParser(ParseOptions options, const std::shared_ptr<Buffer>& scalar_storage);
+  virtual ~BlockParser() = default;
 
-  /// \brief Parse a block of data insitu (destructively)
-  /// \warning The input must be null terminated
-  Status Parse(const std::shared_ptr<Buffer>& json) { return impl_->Parse(json); }
+  /// \brief Parse a block of data
+  virtual Status Parse(const std::shared_ptr<Buffer>& json) = 0;
 
   /// \brief Extract parsed data
-  Status Finish(std::shared_ptr<Array>* parsed) { return impl_->Finish(parsed); }
+  virtual Status Finish(std::shared_ptr<Array>* parsed) = 0;
 
   /// \brief Return the number of parsed rows
-  int32_t num_rows() const { return impl_->num_rows(); }
+  int32_t num_rows() const { return num_rows_; }
+
+  static Status Make(MemoryPool* pool, const ParseOptions& options,
+                     std::unique_ptr<BlockParser>* out);
 
-  struct Impl {
-    virtual ~Impl() = default;
-    virtual Status Parse(const std::shared_ptr<Buffer>& json) = 0;
-    virtual Status Finish(std::shared_ptr<Array>* parsed) = 0;
-    virtual int32_t num_rows() = 0;
-  };
+  static Status Make(const ParseOptions& options, std::unique_ptr<BlockParser>* out);
 
  protected:
   ARROW_DISALLOW_COPY_AND_ASSIGN(BlockParser);
 
+  explicit BlockParser(MemoryPool* pool) : pool_(pool) {}
+
   MemoryPool* pool_;
-  const ParseOptions options_;
-  std::unique_ptr<Impl> impl_;
+  int32_t num_rows_ = 0;
 };
 
 }  // namespace json
 }  // namespace arrow
-
-#endif  // ARROW_JSON_PARSER_H
diff --git a/cpp/src/arrow/json/api.h b/cpp/src/arrow/json/rapidjson-defs.h
similarity index 51%
copy from cpp/src/arrow/json/api.h
copy to cpp/src/arrow/json/rapidjson-defs.h
index 00fbc2e..68dd0be 100644
--- a/cpp/src/arrow/json/api.h
+++ b/cpp/src/arrow/json/rapidjson-defs.h
@@ -15,10 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_JSON_API_H
-#define ARROW_JSON_API_H
+// Include this file before including any RapidJSON headers.
 
-#include "arrow/json/options.h"
-#include "arrow/json/reader.h"
+#define RAPIDJSON_HAS_STDSTRING 1
+#define RAPIDJSON_HAS_CXX11_RVALUE_REFS 1
+#define RAPIDJSON_HAS_CXX11_RANGE_FOR 1
 
-#endif  // ARROW_JSON_API_H
+// rapidjson will be defined in namespace arrow::rapidjson
+#define RAPIDJSON_NAMESPACE arrow::rapidjson
+#define RAPIDJSON_NAMESPACE_BEGIN \
+  namespace arrow {               \
+  namespace rapidjson {
+#define RAPIDJSON_NAMESPACE_END \
+  }                             \
+  }
+
+#include "arrow/util/sse-util.h"
+
+// enable SIMD whitespace skipping, if available
+#if defined(ARROW_HAVE_SSE2)
+#define RAPIDJSON_SSE2 1
+#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
+#endif
+
+#if defined(ARROW_HAVE_SSE4_2)
+#define RAPIDJSON_SSE42 1
+#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
+#endif
diff --git a/cpp/src/arrow/json/reader.cc b/cpp/src/arrow/json/reader.cc
index 0e147ab..ebd5488 100644
--- a/cpp/src/arrow/json/reader.cc
+++ b/cpp/src/arrow/json/reader.cc
@@ -18,44 +18,23 @@
 #include "arrow/json/reader.h"
 
 #include <unordered_map>
+#include <utility>
+#include <vector>
 
 #include "arrow/array.h"
 #include "arrow/builder.h"
+#include "arrow/json/parser.h"
+#include "arrow/table.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/parsing.h"
+#include "arrow/visitor_inline.h"
 
 namespace arrow {
 namespace json {
 
 using internal::StringConverter;
 
-Kind::type KindFromTag(const std::shared_ptr<const KeyValueMetadata>& tag) {
-  std::string kind_name = tag->value(0);
-  switch (kind_name[0]) {
-    case 'n':
-      if (kind_name[2] == 'l') {
-        return Kind::kNull;
-      } else {
-        return Kind::kNumber;
-      }
-    case 'b':
-      return Kind::kBoolean;
-    case 's':
-      return Kind::kString;
-    case 'o':
-      return Kind::kObject;
-    case 'a':
-      return Kind::kArray;
-    default:
-      ARROW_LOG(FATAL);
-      return Kind::kNull;
-  }
-}
-
-static Status Convert(const std::shared_ptr<DataType>& out_type,
-                      std::shared_ptr<Array> in, std::shared_ptr<Array>* out);
-
 struct ConvertImpl {
   Status Visit(const NullType&) {
     *out = in;
@@ -177,8 +156,8 @@ struct ConvertImpl {
   std::shared_ptr<Array>* out;
 };
 
-static Status Convert(const std::shared_ptr<DataType>& out_type,
-                      std::shared_ptr<Array> in, std::shared_ptr<Array>* out) {
+Status Convert(const std::shared_ptr<DataType>& out_type,
+               const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) {
   ConvertImpl visitor = {out_type, in, out};
   return VisitTypeInline(*out_type, &visitor);
 }
@@ -187,7 +166,7 @@ static Status InferAndConvert(std::shared_ptr<DataType> expected,
                               const std::shared_ptr<const KeyValueMetadata>& tag,
                               const std::shared_ptr<Array>& in,
                               std::shared_ptr<Array>* out) {
-  Kind::type kind = KindFromTag(tag);
+  Kind::type kind = Kind::FromTag(tag);
   switch (kind) {
     case Kind::kObject: {
       // FIXME(bkietz) in general expected fields may not be an exact prefix of parsed's
@@ -269,18 +248,20 @@ static Status InferAndConvert(std::shared_ptr<DataType> expected,
 
 Status ParseOne(ParseOptions options, std::shared_ptr<Buffer> json,
                 std::shared_ptr<RecordBatch>* out) {
-  BlockParser parser(default_memory_pool(), options, json);
-  RETURN_NOT_OK(parser.Parse(json));
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(options, &parser));
+  RETURN_NOT_OK(parser->Parse(json));
   std::shared_ptr<Array> parsed;
-  RETURN_NOT_OK(parser.Finish(&parsed));
+  RETURN_NOT_OK(parser->Finish(&parsed));
   std::shared_ptr<Array> converted;
   auto schm = options.explicit_schema;
   if (options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
     if (schm) {
-      RETURN_NOT_OK(InferAndConvert(struct_(schm->fields()), Tag(Kind::kObject), parsed,
-                                    &converted));
+      RETURN_NOT_OK(InferAndConvert(struct_(schm->fields()), Kind::Tag(Kind::kObject),
+                                    parsed, &converted));
     } else {
-      RETURN_NOT_OK(InferAndConvert(nullptr, Tag(Kind::kObject), parsed, &converted));
+      RETURN_NOT_OK(
+          InferAndConvert(nullptr, Kind::Tag(Kind::kObject), parsed, &converted));
     }
     schm = schema(converted->type()->children());
   } else {
diff --git a/cpp/src/arrow/json/reader.h b/cpp/src/arrow/json/reader.h
index 6463793..51a3473 100644
--- a/cpp/src/arrow/json/reader.h
+++ b/cpp/src/arrow/json/reader.h
@@ -15,23 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_JSON_READER_H
-#define ARROW_JSON_READER_H
+#pragma once
 
 #include <memory>
-#include <string>
-#include <utility>
-#include <vector>
 
-#include "arrow/json/options.h"  // IWYU pragma: keep
-#include "arrow/json/parser.h"   // IWYU pragma: keep
+#include "arrow/json/options.h"
 #include "arrow/status.h"
+#include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
+class Buffer;
 class MemoryPool;
 class Table;
+class RecordBatch;
+class Array;
+class DataType;
 
 namespace io {
 class InputStream;
@@ -45,7 +45,6 @@ class ARROW_EXPORT TableReader {
 
   virtual Status Read(std::shared_ptr<Table>* out) = 0;
 
-  // XXX pass optional schema?
   static Status Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
                      const ReadOptions&, const ParseOptions&,
                      std::shared_ptr<TableReader>* out);
@@ -54,7 +53,10 @@ class ARROW_EXPORT TableReader {
 ARROW_EXPORT Status ParseOne(ParseOptions options, std::shared_ptr<Buffer> json,
                              std::shared_ptr<RecordBatch>* out);
 
+/// \brief convert an Array produced by BlockParser into an Array of out_type
+ARROW_EXPORT Status Convert(const std::shared_ptr<DataType>& out_type,
+                            const std::shared_ptr<Array>& in,
+                            std::shared_ptr<Array>* out);
+
 }  // namespace json
 }  // namespace arrow
-
-#endif  // ARROW_JSON_READER_H
diff --git a/cpp/src/arrow/json/test-common.h b/cpp/src/arrow/json/test-common.h
index 0e7f6e3..16f43eb 100644
--- a/cpp/src/arrow/json/test-common.h
+++ b/cpp/src/arrow/json/test-common.h
@@ -15,36 +15,47 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <algorithm>
 #include <memory>
 #include <random>
 #include <string>
 #include <vector>
 
-#include <rapidjson/writer.h>
+#include "arrow/json/rapidjson-defs.h"
+#include "rapidjson/document.h"
+#include "rapidjson/prettywriter.h"
+#include "rapidjson/reader.h"
+#include "rapidjson/writer.h"
 
+#include "arrow/io/memory.h"
+#include "arrow/json/options.h"
+#include "arrow/json/parser.h"
 #include "arrow/testing/gtest_util.h"
-#include "arrow/testing/util.h"
+#include "arrow/type.h"
 #include "arrow/util/string_view.h"
 #include "arrow/visitor_inline.h"
 
 namespace arrow {
 namespace json {
 
-using rapidjson::StringBuffer;
-using Writer = rapidjson::Writer<StringBuffer>;
+namespace rj = arrow::rapidjson;
+
+using rj::StringBuffer;
+using util::string_view;
+using Writer = rj::Writer<StringBuffer>;
 
 inline static Status OK(bool ok) { return ok ? Status::OK() : Status::Invalid(""); }
 
 template <typename Engine>
-static Status Generate(const std::shared_ptr<DataType>& type, Engine& e, Writer* writer);
+inline static Status Generate(const std::shared_ptr<DataType>& type, Engine& e,
+                              Writer* writer);
 
 template <typename Engine>
-static Status Generate(const std::vector<std::shared_ptr<Field>>& fields, Engine& e,
-                       Writer* writer);
+inline static Status Generate(const std::vector<std::shared_ptr<Field>>& fields,
+                              Engine& e, Writer* writer);
 
 template <typename Engine>
-static Status Generate(const std::shared_ptr<Schema>& schm, Engine& e, Writer* writer) {
+inline static Status Generate(const std::shared_ptr<Schema>& schm, Engine& e,
+                              Writer* writer) {
   return Generate(schm->fields(), e, writer);
 }
 
@@ -89,16 +100,17 @@ struct GenerateImpl {
   Status Visit(const ListType& t) {
     auto size = std::poisson_distribution<>{4}(e);
     writer.StartArray();
-    for (int i = 0; i != size; ++i) RETURN_NOT_OK(Generate(t.value_type(), e, &writer));
+    for (int i = 0; i < size; ++i) RETURN_NOT_OK(Generate(t.value_type(), e, &writer));
     return OK(writer.EndArray(size));
   }
   Status Visit(const StructType& t) { return Generate(t.children(), e, &writer); }
   Engine& e;
-  rapidjson::Writer<rapidjson::StringBuffer>& writer;
+  rj::Writer<rj::StringBuffer>& writer;
 };
 
 template <typename Engine>
-static Status Generate(const std::shared_ptr<DataType>& type, Engine& e, Writer* writer) {
+inline static Status Generate(const std::shared_ptr<DataType>& type, Engine& e,
+                              Writer* writer) {
   if (std::uniform_real_distribution<>{0, 1}(e) < .2) {
     // one out of 5 chance of null, anywhere
     writer->Null();
@@ -109,8 +121,8 @@ static Status Generate(const std::shared_ptr<DataType>& type, Engine& e, Writer*
 }
 
 template <typename Engine>
-static Status Generate(const std::vector<std::shared_ptr<Field>>& fields, Engine& e,
-                       Writer* writer) {
+inline static Status Generate(const std::vector<std::shared_ptr<Field>>& fields,
+                              Engine& e, Writer* writer) {
   RETURN_NOT_OK(OK(writer->StartObject()));
   for (const auto& f : fields) {
     writer->Key(f->name().c_str());
@@ -119,21 +131,22 @@ static Status Generate(const std::vector<std::shared_ptr<Field>>& fields, Engine
   return OK(writer->EndObject(static_cast<int>(fields.size())));
 }
 
-Status MakeBuffer(util::string_view data, std::shared_ptr<Buffer>* out) {
-  RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), data.size(), out));
-  std::copy(std::begin(data), std::end(data), (*out)->mutable_data());
+inline static Status MakeStream(string_view src_str,
+                                std::shared_ptr<io::InputStream>* out) {
+  auto src = std::make_shared<Buffer>(src_str);
+  *out = std::make_shared<io::BufferReader>(src);
   return Status::OK();
 }
 
 // scalar values (numbers and strings) are parsed into a
 // dictionary<index:int32, value:string>. This can be decoded for ease of comparison
-Status DecodeStringDictionary(const DictionaryArray& dict_array,
-                              std::shared_ptr<Array>* decoded) {
+inline static Status DecodeStringDictionary(const DictionaryArray& dict_array,
+                                            std::shared_ptr<Array>* decoded) {
   const StringArray& dict = static_cast<const StringArray&>(*dict_array.dictionary());
   const Int32Array& indices = static_cast<const Int32Array&>(*dict_array.indices());
   StringBuilder builder;
   RETURN_NOT_OK(builder.Resize(indices.length()));
-  for (int64_t i = 0; i != indices.length(); ++i) {
+  for (int64_t i = 0; i < indices.length(); ++i) {
     if (indices.IsNull(i)) {
       builder.UnsafeAppendNull();
       continue;
@@ -145,5 +158,23 @@ Status DecodeStringDictionary(const DictionaryArray& dict_array,
   return builder.Finish(decoded);
 }
 
+inline static Status ParseFromString(ParseOptions options, string_view src_str,
+                                     std::shared_ptr<Array>* parsed) {
+  auto src = std::make_shared<Buffer>(src_str);
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(options, &parser));
+  RETURN_NOT_OK(parser->Parse(src));
+  return parser->Finish(parsed);
+}
+
+std::string PrettyPrint(string_view one_line) {
+  rj::Document document;
+  document.Parse(one_line.data());
+  rj::StringBuffer sb;
+  rj::PrettyWriter<rj::StringBuffer> writer(sb);
+  document.Accept(writer);
+  return sb.GetString();
+}
+
 }  // namespace json
 }  // namespace arrow
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index a8d6214..2e9483a 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -299,69 +299,84 @@ struct is_8bit_int {
       (std::is_same<UInt8Type, T>::value || std::is_same<Int8Type, T>::value);
 };
 
-template <typename T>
-using enable_if_8bit_int = typename std::enable_if<is_8bit_int<T>::value>::type;
+template <typename T, typename R = void>
+using enable_if_8bit_int = typename std::enable_if<is_8bit_int<T>::value, R>::type;
 
-template <typename T>
+template <typename T, typename R = void>
 using enable_if_primitive_ctype =
-    typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value>::type;
+    typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value, R>::type;
 
-template <typename T>
-using enable_if_date = typename std::enable_if<std::is_base_of<DateType, T>::value>::type;
-
-template <typename T, typename U = void>
+template <typename T, typename R = void>
 using enable_if_integer =
-    typename std::enable_if<std::is_base_of<Integer, T>::value, U>::type;
+    typename std::enable_if<std::is_base_of<Integer, T>::value, R>::type;
 
 template <typename T>
+using is_signed_integer =
+    std::integral_constant<bool, std::is_base_of<Integer, T>::value &&
+                                     std::is_signed<typename T::c_type>::value>;
+
+template <typename T, typename R = void>
 using enable_if_signed_integer =
-    typename std::enable_if<std::is_base_of<Integer, T>::value &&
-                            std::is_signed<typename T::c_type>::value>::type;
+    typename std::enable_if<is_signed_integer<T>::value, R>::type;
 
-template <typename T>
+template <typename T, typename R = void>
 using enable_if_unsigned_integer =
     typename std::enable_if<std::is_base_of<Integer, T>::value &&
-                            std::is_unsigned<typename T::c_type>::value>::type;
+                                std::is_unsigned<typename T::c_type>::value,
+                            R>::type;
 
-template <typename T>
+template <typename T, typename R = void>
 using enable_if_floating_point =
-    typename std::enable_if<std::is_base_of<FloatingPoint, T>::value>::type;
+    typename std::enable_if<std::is_base_of<FloatingPoint, T>::value, R>::type;
 
 template <typename T>
-using enable_if_time = typename std::enable_if<std::is_base_of<TimeType, T>::value>::type;
+using is_date = std::is_base_of<DateType, T>;
 
-template <typename T>
-using enable_if_timestamp =
-    typename std::enable_if<std::is_base_of<TimestampType, T>::value>::type;
+template <typename T, typename R = void>
+using enable_if_date = typename std::enable_if<is_date<T>::value, R>::type;
 
 template <typename T>
-using enable_if_has_c_type = typename std::enable_if<has_c_type<T>::value>::type;
+using is_time = std::is_base_of<TimeType, T>;
 
-template <typename T>
-using enable_if_null = typename std::enable_if<std::is_same<NullType, T>::value>::type;
+template <typename T, typename R = void>
+using enable_if_time = typename std::enable_if<is_time<T>::value, R>::type;
 
 template <typename T>
+using is_timestamp = std::is_base_of<TimestampType, T>;
+
+template <typename T, typename R = void>
+using enable_if_timestamp = typename std::enable_if<is_timestamp<T>::value, R>::type;
+
+template <typename T, typename R = void>
+using enable_if_has_c_type = typename std::enable_if<has_c_type<T>::value, R>::type;
+
+template <typename T, typename R = void>
+using enable_if_null = typename std::enable_if<std::is_same<NullType, T>::value, R>::type;
+
+template <typename T, typename R = void>
 using enable_if_binary =
-    typename std::enable_if<std::is_base_of<BinaryType, T>::value>::type;
+    typename std::enable_if<std::is_base_of<BinaryType, T>::value, R>::type;
 
-template <typename T>
+template <typename T, typename R = void>
 using enable_if_boolean =
-    typename std::enable_if<std::is_same<BooleanType, T>::value>::type;
+    typename std::enable_if<std::is_same<BooleanType, T>::value, R>::type;
 
-template <typename T>
+template <typename T, typename R = void>
 using enable_if_binary_like =
     typename std::enable_if<std::is_base_of<BinaryType, T>::value ||
-                            std::is_base_of<FixedSizeBinaryType, T>::value>::type;
+                                std::is_base_of<FixedSizeBinaryType, T>::value,
+                            R>::type;
 
-template <typename T>
+template <typename T, typename R = void>
 using enable_if_fixed_size_binary =
-    typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T>::value>::type;
+    typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T>::value, R>::type;
 
-template <typename T>
-using enable_if_list = typename std::enable_if<std::is_base_of<ListType, T>::value>::type;
+template <typename T, typename R = void>
+using enable_if_list =
+    typename std::enable_if<std::is_base_of<ListType, T>::value, R>::type;
 
-template <typename T>
-using enable_if_number = typename std::enable_if<is_number<T>::value>::type;
+template <typename T, typename R = void>
+using enable_if_number = typename std::enable_if<is_number<T>::value, R>::type;
 
 namespace detail {
 
diff --git a/cpp/src/arrow/util/parsing.h b/cpp/src/arrow/util/parsing.h
index 9bd1023..dd3c074 100644
--- a/cpp/src/arrow/util/parsing.h
+++ b/cpp/src/arrow/util/parsing.h
@@ -55,6 +55,8 @@ class StringConverter;
 template <>
 class StringConverter<BooleanType> {
  public:
+  explicit StringConverter(const std::shared_ptr<DataType>& = NULLPTR) {}
+
   using value_type = bool;
 
   bool operator()(const char* s, size_t length, value_type* out) {
@@ -97,7 +99,7 @@ class StringToFloatConverterMixin {
  public:
   using value_type = typename ARROW_TYPE::c_type;
 
-  StringToFloatConverterMixin()
+  explicit StringToFloatConverterMixin(const std::shared_ptr<DataType>& = NULLPTR)
       : main_converter_(flags_, main_junk_value_, main_junk_value_, "inf", "nan"),
         fallback_converter_(flags_, fallback_junk_value_, fallback_junk_value_, "inf",
                             "nan") {}
@@ -148,10 +150,14 @@ class StringToFloatConverterMixin {
 };
 
 template <>
-class StringConverter<FloatType> : public StringToFloatConverterMixin<FloatType> {};
+class StringConverter<FloatType> : public StringToFloatConverterMixin<FloatType> {
+  using StringToFloatConverterMixin<FloatType>::StringToFloatConverterMixin;
+};
 
 template <>
-class StringConverter<DoubleType> : public StringToFloatConverterMixin<DoubleType> {};
+class StringConverter<DoubleType> : public StringToFloatConverterMixin<DoubleType> {
+  using StringToFloatConverterMixin<DoubleType>::StringToFloatConverterMixin;
+};
 
 // NOTE: HalfFloatType would require a half<->float conversion library
 
@@ -277,6 +283,9 @@ class StringToUnsignedIntConverterMixin {
  public:
   using value_type = typename ARROW_TYPE::c_type;
 
+  explicit StringToUnsignedIntConverterMixin(const std::shared_ptr<DataType>& = NULLPTR) {
+  }
+
   bool operator()(const char* s, size_t length, value_type* out) {
     if (ARROW_PREDICT_FALSE(length == 0)) {
       return false;
@@ -291,18 +300,23 @@ class StringToUnsignedIntConverterMixin {
 };
 
 template <>
-class StringConverter<UInt8Type> : public StringToUnsignedIntConverterMixin<UInt8Type> {};
+class StringConverter<UInt8Type> : public StringToUnsignedIntConverterMixin<UInt8Type> {
+  using StringToUnsignedIntConverterMixin<UInt8Type>::StringToUnsignedIntConverterMixin;
+};
 
 template <>
 class StringConverter<UInt16Type> : public StringToUnsignedIntConverterMixin<UInt16Type> {
+  using StringToUnsignedIntConverterMixin<UInt16Type>::StringToUnsignedIntConverterMixin;
 };
 
 template <>
 class StringConverter<UInt32Type> : public StringToUnsignedIntConverterMixin<UInt32Type> {
+  using StringToUnsignedIntConverterMixin<UInt32Type>::StringToUnsignedIntConverterMixin;
 };
 
 template <>
 class StringConverter<UInt64Type> : public StringToUnsignedIntConverterMixin<UInt64Type> {
+  using StringToUnsignedIntConverterMixin<UInt64Type>::StringToUnsignedIntConverterMixin;
 };
 
 template <class ARROW_TYPE>
@@ -311,6 +325,8 @@ class StringToSignedIntConverterMixin {
   using value_type = typename ARROW_TYPE::c_type;
   using unsigned_type = typename std::make_unsigned<value_type>::type;
 
+  explicit StringToSignedIntConverterMixin(const std::shared_ptr<DataType>& = NULLPTR) {}
+
   bool operator()(const char* s, size_t length, value_type* out) {
     static constexpr unsigned_type max_positive =
         static_cast<unsigned_type>(std::numeric_limits<value_type>::max());
@@ -356,16 +372,24 @@ class StringToSignedIntConverterMixin {
 };
 
 template <>
-class StringConverter<Int8Type> : public StringToSignedIntConverterMixin<Int8Type> {};
+class StringConverter<Int8Type> : public StringToSignedIntConverterMixin<Int8Type> {
+  using StringToSignedIntConverterMixin<Int8Type>::StringToSignedIntConverterMixin;
+};
 
 template <>
-class StringConverter<Int16Type> : public StringToSignedIntConverterMixin<Int16Type> {};
+class StringConverter<Int16Type> : public StringToSignedIntConverterMixin<Int16Type> {
+  using StringToSignedIntConverterMixin<Int16Type>::StringToSignedIntConverterMixin;
+};
 
 template <>
-class StringConverter<Int32Type> : public StringToSignedIntConverterMixin<Int32Type> {};
+class StringConverter<Int32Type> : public StringToSignedIntConverterMixin<Int32Type> {
+  using StringToSignedIntConverterMixin<Int32Type>::StringToSignedIntConverterMixin;
+};
 
 template <>
-class StringConverter<Int64Type> : public StringToSignedIntConverterMixin<Int64Type> {};
+class StringConverter<Int64Type> : public StringToSignedIntConverterMixin<Int64Type> {
+  using StringToSignedIntConverterMixin<Int64Type>::StringToSignedIntConverterMixin;
+};
 
 template <>
 class StringConverter<TimestampType> {
diff --git a/cpp/src/arrow/util/sse-util.h b/cpp/src/arrow/util/sse-util.h
index 15e7c99..b470b41 100644
--- a/cpp/src/arrow/util/sse-util.h
+++ b/cpp/src/arrow/util/sse-util.h
@@ -18,11 +18,9 @@
 // From Apache Impala as of 2016-01-29. Pared down to a minimal set of
 // functions needed for parquet-cpp
 
-#ifndef ARROW_UTIL_SSE_UTIL_H
-#define ARROW_UTIL_SSE_UTIL_H
+#pragma once
 
-#undef ARROW_HAVE_SSE2
-#undef ARROW_HAVE_SSE4_2
+#include "arrow/util/macros.h"
 
 #ifdef ARROW_USE_SIMD
 
@@ -36,7 +34,7 @@
 
 // gcc/clang (possibly others)
 
-#if defined(__SSE4_2__)
+#if defined(__SSE2__)
 #define ARROW_HAVE_SSE2 1
 #include <emmintrin.h>
 #endif
@@ -46,7 +44,9 @@
 #include <nmmintrin.h>
 #endif
 
-#endif
+#endif  // ARROW_USE_SIMD
+
+// MSVC x86-64
 
 namespace arrow {
 
@@ -155,5 +155,3 @@ static inline uint32_t SSE4_crc32_u64(uint32_t, uint64_t) {
 #endif  // ARROW_HAVE_SSE4_2
 
 }  // namespace arrow
-
-#endif  //  ARROW_UTIL_SSE_UTIL_H